You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2019/05/09 08:12:51 UTC
[servicecomb-kie] 04/06: add kv data access api
This is an automated email from the ASF dual-hosted git repository.
littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-kie.git
commit 6afc216839e28680776a539407cfb30cf83ecd5d
Author: tian <xi...@gmail.com>
AuthorDate: Tue May 7 19:25:30 2019 +0800
add kv data access api
---
deployments/docker/docker-compose.yaml | 19 +++
go.mod | 16 ++
pkg/model/kv.go | 49 ++++++
pkg/model/kv_test.go | 45 +++++
server/config/config.go | 44 +++++
server/config/config_test.go | 51 ++++++
server/config/struct.go | 29 ++++
server/kv/errors.go | 35 ++++
server/kv/kv.go | 62 +++++++
server/kv/kv_test.go | 192 +++++++++++++++++++++
server/kv/model_suite_test.go | 44 +++++
server/kv/mongodb.go | 304 +++++++++++++++++++++++++++++++++
server/kv/options.go | 49 ++++++
13 files changed, 939 insertions(+)
diff --git a/deployments/docker/docker-compose.yaml b/deployments/docker/docker-compose.yaml
new file mode 100644
index 0000000..2c87590
--- /dev/null
+++ b/deployments/docker/docker-compose.yaml
@@ -0,0 +1,19 @@
+version: '3.1'
+services:
+ mongo:
+ image: mongo
+ restart: always
+ ports:
+ - 27017:27017
+ environment:
+ MONGO_INITDB_ROOT_USERNAME: kie
+ MONGO_INITDB_ROOT_PASSWORD: 123
+
+ mongo-express:
+ image: mongo-express
+ restart: always
+ ports:
+ - 8081:8081
+ environment:
+ ME_CONFIG_MONGODB_ADMINUSERNAME: kie
+ ME_CONFIG_MONGODB_ADMINPASSWORD: 123
\ No newline at end of file
diff --git a/go.mod b/go.mod
new file mode 100644
index 0000000..da3d3eb
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,16 @@
+module github.com/apache/servicecomb-kie
+
+require (
+ github.com/go-chassis/go-archaius v0.14.0
+ github.com/go-chassis/go-chassis v1.4.0 // indirect
+ github.com/go-chassis/paas-lager v1.0.2-0.20190328010332-cf506050ddb2
+ github.com/go-mesh/openlogging v1.0.1-0.20181205082104-3d418c478b2d
+ github.com/onsi/ginkgo v1.8.0
+ github.com/onsi/gomega v1.5.0
+ github.com/stretchr/testify v1.2.2
+ github.com/urfave/cli v1.20.0 // indirect
+ github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect
+ github.com/xdg/stringprep v1.0.0 // indirect
+ go.mongodb.org/mongo-driver v1.0.0
+ gopkg.in/yaml.v2 v2.2.1
+)
diff --git a/pkg/model/kv.go b/pkg/model/kv.go
new file mode 100644
index 0000000..1c31fc0
--- /dev/null
+++ b/pkg/model/kv.go
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package model
+
+import (
+ "go.mongodb.org/mongo-driver/bson/primitive"
+)
+
+type Labels map[string]string
+
+//func (m Labels) ToString() string {
+// sb := strings.Builder{}
+// for k, v := range m {
+// sb.WriteString(k + "=" + v + ",")
+// }
+// return sb.String()
+//}
+
+type KV struct {
+ ID primitive.ObjectID `json:"_id,omitempty" bson:"_id,omitempty"`
+ Key string `json:"key"`
+ Value string `json:"value"`
+ ValueType string `json:"valueType"` //ini,json,text,yaml,properties
+ Domain string `json:"domain"` //tenant info
+ Labels map[string]string `json:"labels,omitempty"` //key has labels
+ Checker string `json:"check,omitempty"` //python script
+ Revision int `json:"revision"`
+}
+type KVHistory struct {
+ KID string `json:"id,omitempty" bson:"kvID"`
+ Value string `json:"value"`
+ Checker string `json:"check,omitempty"` //python script
+ Revision int `json:"revision"`
+}
diff --git a/pkg/model/kv_test.go b/pkg/model/kv_test.go
new file mode 100644
index 0000000..cbb05c0
--- /dev/null
+++ b/pkg/model/kv_test.go
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package model_test
+
+import (
+ "encoding/json"
+ "github.com/apache/servicecomb-kie/pkg/model"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestKV_UnmarshalJSON(t *testing.T) {
+ kv := &model.KV{
+ Value: "test",
+ Labels: map[string]string{
+ "test": "env",
+ },
+ }
+ b, _ := json.Marshal(kv)
+ t.Log(string(b))
+
+ var kv2 model.KV
+ err := json.Unmarshal([]byte(`
+ {"value": "1","labels":{"test":"env"}}
+ `), &kv2)
+ assert.NoError(t, err)
+ assert.Equal(t, "env", kv2.Labels["test"])
+ assert.Equal(t, "1", kv2.Value)
+
+}
diff --git a/server/config/config.go b/server/config/config.go
new file mode 100644
index 0000000..7e03731
--- /dev/null
+++ b/server/config/config.go
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config
+
+import (
+ "github.com/go-chassis/go-archaius"
+ "github.com/go-chassis/go-archaius/sources/file-source"
+ "gopkg.in/yaml.v2"
+ "path/filepath"
+)
+
+var configurations *Config
+
+func Init(file string) error {
+ if err := archaius.AddFile(file, archaius.WithFileHandler(filesource.UseFileNameAsKeyContentAsValue)); err != nil {
+ return err
+ }
+ _, filename := filepath.Split(file)
+ content := archaius.GetString(filename, "")
+ configurations = &Config{}
+ if err := yaml.Unmarshal([]byte(content), configurations); err != nil {
+ return err
+ }
+ return nil
+}
+
+func GetDB() DB {
+ return configurations.DB
+}
diff --git a/server/config/config_test.go b/server/config/config_test.go
new file mode 100644
index 0000000..75ca3ae
--- /dev/null
+++ b/server/config/config_test.go
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config_test
+
+import (
+ "github.com/apache/servicecomb-kie/server/config"
+ "github.com/go-chassis/go-archaius"
+ "github.com/stretchr/testify/assert"
+ "io"
+ "os"
+ "testing"
+)
+
+func TestInit(t *testing.T) {
+ err := archaius.Init()
+ assert.NoError(t, err)
+ b := []byte(`
+db:
+ uri: mongodb://admin:123@127.0.0.1:27017/kie
+ type: mongodb
+ poolSize: 10
+ ssl: false
+ sslCA:
+ sslCert:
+
+`)
+ defer os.Remove("test.yaml")
+ f1, err := os.Create("test.yaml")
+ assert.NoError(t, err)
+ _, err = io.WriteString(f1, string(b))
+ assert.NoError(t, err)
+ err = config.Init("test.yaml")
+ assert.NoError(t, err)
+ assert.Equal(t, 10, config.GetDB().PoolSize)
+ assert.Equal(t, "mongodb://admin:123@127.0.0.1:27017/kie", config.GetDB().URI)
+}
diff --git a/server/config/struct.go b/server/config/struct.go
new file mode 100644
index 0000000..cbfb644
--- /dev/null
+++ b/server/config/struct.go
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config
+
+type Config struct {
+ DB DB `yaml:"db"`
+}
+type DB struct {
+ URI string `yaml:"uri"`
+ PoolSize int `yaml:"poolSize"`
+ SSL bool `yaml:"ssl"`
+ CABundle []string `yaml:"sslCA"`
+ Cert string `yaml:"sslCert"`
+}
diff --git a/server/kv/errors.go b/server/kv/errors.go
new file mode 100644
index 0000000..958a015
--- /dev/null
+++ b/server/kv/errors.go
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kv
+
+import (
+ "errors"
+ "fmt"
+
+ "github.com/apache/servicecomb-kie/pkg/model"
+ "github.com/go-mesh/openlogging"
+)
+
+//ErrAction will wrap raw error to biz error and return
+//it record audit log for mongodb operation failure like find, insert, update, deletion
+func ErrAction(action, key string, labels model.Labels, domain string, err error) error {
+ msg := fmt.Sprintf("can not [%s] [%s] in [%s] with [%s],err: %s", action, key, domain, labels, err.Error())
+ openlogging.Error(msg)
+ return errors.New(msg)
+
+}
diff --git a/server/kv/kv.go b/server/kv/kv.go
new file mode 100644
index 0000000..9513e60
--- /dev/null
+++ b/server/kv/kv.go
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kv
+
+import (
+ "crypto/tls"
+ "errors"
+ "time"
+ "github.com/apache/servicecomb-kie/server/config"
+ "github.com/apache/servicecomb-kie/pkg/model"
+)
+
+var ErrMissingDomain = errors.New("domain info missing, illegal access")
+var ErrNotExists = errors.New("key with labels does not exits")
+var ErrTooMany = errors.New("key with labels should be only one")
+var ErrKeyMustNotEmpty = errors.New("must supply key if you want to get exact one result")
+
+type Service interface {
+ CreateOrUpdate(kv *model.KV) (*model.KV, error)
+ //do not use primitive.ObjectID as return to decouple with mongodb, we can afford perf lost
+ Exist(key, domain string, labels model.Labels) (string, error)
+ DeleteByID(id string) error
+ Delete(key, domain string, labels model.Labels) error
+ Find(domain string, options ...CallOption) ([]*model.KV, error)
+ AddHistory(kv *model.KV) error
+ //RollBack(kv *KV, version string) error
+}
+
+type Options struct {
+ URI string
+ PoolSize int
+ SSL bool
+ TLS *tls.Config
+ Timeout time.Duration
+}
+
+func NewKVService() (Service, error) {
+ opts := Options{
+ URI: config.GetDB().URI,
+ PoolSize: config.GetDB().PoolSize,
+ SSL: config.GetDB().SSL,
+ }
+ if opts.SSL {
+
+ }
+ return NewMongoService(opts)
+}
diff --git a/server/kv/kv_test.go b/server/kv/kv_test.go
new file mode 100644
index 0000000..4bdd043
--- /dev/null
+++ b/server/kv/kv_test.go
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kv_test
+
+import (
+ . "github.com/apache/servicecomb-kie/pkg/model"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+ "github.com/apache/servicecomb-kie/pkg/model"
+ "github.com/apache/servicecomb-kie/server/kv"
+)
+
+var _ = Describe("Kv mongodb service", func() {
+ var s kv.Service
+ var err error
+ Describe("connecting db", func() {
+ s, err = kv.NewMongoService(kv.Options{
+ URI: "mongodb://kie:123@127.0.0.1:27017",
+ })
+ It("should not return err", func() {
+ Expect(err).Should(BeNil())
+ })
+ })
+
+ Describe("put kv timeout", func() {
+ Context("with labels app and service", func() {
+ kv, err := s.CreateOrUpdate(&model.KV{
+ Key: "timeout",
+ Value: "2s",
+ Domain: "default",
+ Labels: map[string]string{
+ "app": "mall",
+ "service": "cart",
+ },
+ })
+ It("should not return err", func() {
+ Expect(err).Should(BeNil())
+ })
+ It("should has revision", func() {
+ Expect(kv.Revision).ShouldNot(BeZero())
+ })
+ It("should has ID", func() {
+ Expect(kv.ID.Hex()).ShouldNot(BeEmpty())
+ })
+
+ })
+ Context("with labels app, service and version", func() {
+ kv, err := s.CreateOrUpdate(&KV{
+ Key: "timeout",
+ Value: "2s",
+ Domain: "default",
+ Labels: map[string]string{
+ "app": "mall",
+ "service": "cart",
+ "version": "1.0.0",
+ },
+ })
+ oid, err := s.Exist("timeout", "default", map[string]string{
+ "app": "mall",
+ "service": "cart",
+ "version": "1.0.0",
+ })
+ It("should not return err", func() {
+ Expect(err).Should(BeNil())
+ })
+ It("should has revision", func() {
+ Expect(kv.Revision).ShouldNot(BeZero())
+ })
+ It("should has ID", func() {
+ Expect(kv.ID.Hex()).ShouldNot(BeEmpty())
+ })
+ It("should exist", func() {
+ Expect(oid).ShouldNot(BeEmpty())
+ })
+ })
+ Context("with labels app,and update value", func() {
+ beforeKV, err := s.CreateOrUpdate(&KV{
+ Key: "timeout",
+ Value: "1s",
+ Domain: "default",
+ Labels: map[string]string{
+ "app": "mall",
+ },
+ })
+ It("should not return err", func() {
+ Expect(err).Should(BeNil())
+ })
+ kvs1, err := s.Find("default", kv.WithKey("timeout"), kv.WithLabels(map[string]string{
+ "app": "mall",
+ }), kv.WithExactLabels())
+ It("should be 1s", func() {
+ Expect(kvs1[0].Value).Should(Equal(beforeKV.Value))
+ })
+ afterKV, err := s.CreateOrUpdate(&KV{
+ Key: "timeout",
+ Value: "3s",
+ Domain: "default",
+ Labels: map[string]string{
+ "app": "mall",
+ },
+ })
+ It("should has same id", func() {
+ Expect(afterKV.ID.Hex()).Should(Equal(beforeKV.ID.Hex()))
+ })
+ oid, err := s.Exist("timeout", "default", map[string]string{
+ "app": "mall",
+ })
+ It("should exists", func() {
+ Expect(oid).Should(Equal(beforeKV.ID.Hex()))
+ })
+ kvs, err := s.Find("default", kv.WithKey("timeout"), kv.WithLabels(map[string]string{
+ "app": "mall",
+ }), kv.WithExactLabels())
+ It("should be 3s", func() {
+ Expect(kvs[0].Value).Should(Equal(afterKV.Value))
+ })
+ })
+ })
+
+ Describe("greedy find by kv and labels", func() {
+ Context("with labels app ", func() {
+ kvs, err := s.Find("default", kv.WithKey("timeout"), kv.WithLabels(map[string]string{
+ "app": "mall",
+ }))
+ It("should not return err", func() {
+ Expect(err).Should(BeNil())
+ })
+ It("should has 3 records", func() {
+ Expect(len(kvs)).Should(Equal(3))
+ })
+
+ })
+ })
+ Describe("exact find by kv and labels", func() {
+ Context("with labels app ", func() {
+ kvs, err := s.Find("default", kv.WithKey("timeout"), kv.WithLabels(map[string]string{
+ "app": "mall",
+ }), kv.WithExactLabels())
+ It("should not return err", func() {
+ Expect(err).Should(BeNil())
+ })
+ It("should has 1 records", func() {
+ Expect(len(kvs)).Should(Equal(1))
+ })
+
+ })
+ })
+ Describe("exact find by labels", func() {
+ Context("with labels app ", func() {
+ kvs, err := s.Find("default", kv.WithLabels(map[string]string{
+ "app": "mall",
+ }), kv.WithExactLabels())
+ It("should not return err", func() {
+ Expect(err).Should(BeNil())
+ })
+ It("should has 1 records", func() {
+ Expect(len(kvs)).Should(Equal(1))
+ })
+
+ })
+ })
+ Describe("greedy find by labels", func() {
+ Context("with labels app ans service ", func() {
+ kvs, err := s.Find("default", kv.WithLabels(map[string]string{
+ "app": "mall",
+ "service": "cart",
+ }))
+ It("should not return err", func() {
+ Expect(err).Should(BeNil())
+ })
+ It("should has 2 records", func() {
+ Expect(len(kvs)).Should(Equal(2))
+ })
+
+ })
+ })
+})
diff --git a/server/kv/model_suite_test.go b/server/kv/model_suite_test.go
new file mode 100644
index 0000000..965802e
--- /dev/null
+++ b/server/kv/model_suite_test.go
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kv_test
+
+import (
+ "testing"
+
+ "github.com/go-chassis/paas-lager"
+ "github.com/go-mesh/openlogging"
+ . "github.com/onsi/ginkgo"
+ "github.com/onsi/ginkgo/reporters"
+ . "github.com/onsi/gomega"
+)
+
+func TestModel(t *testing.T) {
+ RegisterFailHandler(Fail)
+ junitReporter := reporters.NewJUnitReporter("junit.xml")
+ RunSpecsWithDefaultAndCustomReporters(t, "Model Suite", []Reporter{junitReporter})
+}
+
+var _ = BeforeSuite(func() {
+ log.Init(log.Config{
+ Writers: []string{"stdout"},
+ LoggerLevel: "DEBUG",
+ })
+
+ logger := log.NewLogger("ut")
+ openlogging.SetLogger(logger)
+})
diff --git a/server/kv/mongodb.go b/server/kv/mongodb.go
new file mode 100644
index 0000000..37664ae
--- /dev/null
+++ b/server/kv/mongodb.go
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kv
+
+import (
+ "context"
+ "fmt"
+ "github.com/apache/servicecomb-kie/pkg/model"
+ "github.com/go-mesh/openlogging"
+ "go.mongodb.org/mongo-driver/bson"
+ "go.mongodb.org/mongo-driver/bson/primitive"
+ "go.mongodb.org/mongo-driver/mongo"
+ "go.mongodb.org/mongo-driver/mongo/options"
+ "time"
+)
+
+var client *mongo.Client
+
+const (
+ DB = "kie"
+ CollectionKV = "kv"
+ CollectionRevision = "revision"
+ DefaultTimeout = 5 * time.Second
+ DefaultValueType = "text"
+)
+
+type MongodbService struct {
+ c *mongo.Client
+ timeout time.Duration
+}
+
+func (s *MongodbService) CreateOrUpdate(kv *model.KV) (*model.KV, error) {
+ if kv.Domain == "" {
+ return nil, ErrMissingDomain
+ }
+ ctx, _ := context.WithTimeout(context.Background(), DefaultTimeout)
+ collection := s.c.Database(DB).Collection(CollectionKV)
+ oid, err := s.Exist(kv.Key, kv.Domain, kv.Labels)
+ if err != nil {
+ if err != ErrNotExists {
+ return nil, err
+ }
+ }
+ if oid != "" {
+ hex, err := primitive.ObjectIDFromHex(oid)
+ if err != nil {
+ openlogging.Error(fmt.Sprintf("convert %s ,err:%s", oid, err))
+ return nil, err
+ }
+ kv.ID = hex
+ if err := s.update(ctx, collection, kv); err != nil {
+ return nil, err
+ }
+ return kv, nil
+ }
+ if kv.ValueType == "" {
+ kv.ValueType = DefaultValueType
+ }
+ //set 1 to revision for insertion
+ kv.Revision = 1
+ res, err := collection.InsertOne(ctx, kv)
+ if err != nil {
+ return nil, err
+ }
+ objectID, _ := res.InsertedID.(primitive.ObjectID)
+ kv.ID = objectID
+ if err := s.AddHistory(kv); err != nil {
+ openlogging.Warn(
+ fmt.Sprintf("can not update version for [%s] [%s] in [%s]",
+ kv.Key, kv.Labels, kv.Domain))
+ }
+ openlogging.Debug(fmt.Sprintf("create %s with labels %s value [%s]", kv.Key, kv.Labels, kv.Value))
+ return kv, nil
+}
+
+//update get latest revision from history
+//and increase revision
+//and update and them add new history
+func (s *MongodbService) update(ctx context.Context, collection *mongo.Collection, kv *model.KV) error {
+ h, err := s.getLatest(kv.ID)
+ if err != nil {
+ openlogging.Error(fmt.Sprintf("get latest [%s][%s] in [%s],err: %s",
+ kv.Key, kv.Labels, kv.Domain, err.Error()))
+ return err
+ }
+ if h != nil {
+ kv.Revision = h.Revision + 1
+ }
+ ur, err := collection.UpdateOne(ctx, bson.M{"_id": kv.ID}, bson.D{
+ {"$set", bson.D{
+ {"value", kv.Value},
+ {"revision", kv.Revision},
+ {"checker", kv.Checker},
+ }},
+ })
+ if err != nil {
+ return err
+ }
+ openlogging.Debug(
+ fmt.Sprintf("update %s with labels %s value [%s] %d ",
+ kv.Key, kv.Labels, kv.Value, ur.ModifiedCount))
+ if err := s.AddHistory(kv); err != nil {
+ openlogging.Warn(
+ fmt.Sprintf("can not update version for [%s] [%s] in [%s]",
+ kv.Key, kv.Labels, kv.Domain))
+ }
+ openlogging.Debug(
+ fmt.Sprintf("add history %s with labels %s value [%s] %d ",
+ kv.Key, kv.Labels, kv.Value, ur.ModifiedCount))
+ return nil
+
+}
+func (s *MongodbService) Exist(key, domain string, labels model.Labels) (string, error) {
+ kvs, err := s.Find(domain, WithExactLabels(), WithLabels(labels), WithKey(key))
+ if err != nil {
+ return "", err
+ }
+ if len(kvs) != 1 {
+ return "", ErrTooMany
+ }
+
+ return kvs[0].ID.Hex(), nil
+
+}
+
+//Find get kvs by key, labels
+//because labels has a a lot of combination,
+//you can use WithExactLabels to return only one kv which's labels exactly match the criteria
+func (s *MongodbService) Find(domain string, options ...CallOption) ([]*model.KV, error) {
+ opts := CallOptions{}
+ for _, o := range options {
+ o(&opts)
+ }
+ if domain == "" {
+ return nil, ErrMissingDomain
+ }
+ collection := s.c.Database(DB).Collection(CollectionKV)
+ ctx, _ := context.WithTimeout(context.Background(), DefaultTimeout)
+ filter := bson.M{"domain": domain}
+ if opts.Key != "" {
+ filter["key"] = opts.Key
+ }
+ for k, v := range opts.Labels {
+ filter["labels."+k] = v
+ }
+
+ cur, err := collection.Find(ctx, filter)
+ if err != nil {
+ if err.Error() == context.DeadlineExceeded.Error() {
+ return nil, ErrAction("find", opts.Key, opts.Labels, domain, fmt.Errorf("can not reach mongodb in %s", s.timeout))
+ }
+ return nil, err
+ }
+ defer cur.Close(ctx)
+ if cur.Err() != nil {
+ return nil, err
+ }
+ if opts.ExactLabels {
+ openlogging.Debug(fmt.Sprintf("find one [%s] with lables [%s] in [%s]", opts.Key, opts.Labels, domain))
+ curKV := &model.KV{} //reuse this pointer to reduce GC, only clear label
+ //check label length to get the exact match
+ for cur.Next(ctx) { //although complexity is O(n), but there won't be so much labels for one key
+ curKV.Labels = nil
+ err := cur.Decode(curKV)
+ if err != nil {
+ openlogging.Error("decode error: " + err.Error())
+ return nil, err
+ }
+ if len(curKV.Labels) == len(opts.Labels) {
+ openlogging.Debug("hit")
+ return []*model.KV{curKV}, nil
+ }
+
+ }
+ return nil, ErrNotExists
+ } else {
+ kvs := make([]*model.KV, 0)
+ for cur.Next(ctx) {
+ curKV := &model.KV{}
+ if err := cur.Decode(curKV); err != nil {
+ openlogging.Error("decode to KVs error: " + err.Error())
+ return nil, err
+ }
+ kvs = append(kvs, curKV)
+
+ }
+ if len(kvs) == 0 {
+ return nil, ErrNotExists
+ }
+ return kvs, nil
+ }
+
+}
+func (s *MongodbService) DeleteByID(id string) error {
+ collection := s.c.Database(DB).Collection(CollectionKV)
+ hex, err := primitive.ObjectIDFromHex(id)
+ if err != nil {
+ openlogging.Error(fmt.Sprintf("convert %s ,err:%s", id, err))
+ return err
+ }
+ ctx, _ := context.WithTimeout(context.Background(), DefaultTimeout)
+ dr, err := collection.DeleteOne(ctx, bson.M{"_id": hex})
+ if err != nil {
+ openlogging.Error(fmt.Sprintf("delete [%s] failed: %s", hex, err))
+ }
+ if dr.DeletedCount != 1 {
+ openlogging.Warn(fmt.Sprintf("delete [%s], but it is not exist", hex))
+ }
+ return nil
+}
+
+func (s *MongodbService) Delete(key, domain string, labels model.Labels) error {
+ return nil
+}
+func (s *MongodbService) AddHistory(kv *model.KV) error {
+ collection := s.c.Database(DB).Collection(CollectionRevision)
+ ctx, _ := context.WithTimeout(context.Background(), DefaultTimeout)
+ h := &model.KVHistory{
+ KID: kv.ID.Hex(),
+ Value: kv.Value,
+ Revision: kv.Revision,
+ Checker: kv.Checker,
+ }
+ _, err := collection.InsertOne(ctx, h)
+ if err != nil {
+ openlogging.Error(err.Error())
+ return err
+ }
+ return nil
+}
+func (s *MongodbService) getLatest(id primitive.ObjectID) (*model.KVHistory, error) {
+ collection := s.c.Database(DB).Collection(CollectionRevision)
+ ctx, _ := context.WithTimeout(context.Background(), DefaultTimeout)
+
+ filter := bson.M{"kvID": id.Hex()}
+
+ cur, err := collection.Find(ctx, filter,
+ options.Find().SetSort(map[string]interface{}{
+ "revision": -1,
+ }), options.Find().SetLimit(1))
+ if err != nil {
+ return nil, err
+ }
+ h := &model.KVHistory{}
+ var exist bool
+ for cur.Next(ctx) {
+ if err := cur.Decode(h); err != nil {
+ openlogging.Error("decode to KVs error: " + err.Error())
+ return nil, err
+ }
+ exist = true
+ break
+ }
+ if !exist {
+ return nil, nil
+ }
+ return h, nil
+}
+func NewMongoService(opts Options) (Service, error) {
+ if opts.Timeout == 0 {
+ opts.Timeout = DefaultTimeout
+ }
+ c, err := getClient(opts)
+ if err != nil {
+ return nil, err
+ }
+ m := &MongodbService{
+ c: c,
+ timeout: opts.Timeout,
+ }
+ return m, nil
+}
+func getClient(opts Options) (*mongo.Client, error) {
+ if client == nil {
+ var err error
+ client, err = mongo.NewClient(options.Client().ApplyURI(opts.URI))
+ if err != nil {
+ return nil, err
+ }
+ openlogging.Info("connecting to " + opts.URI)
+ ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
+ err = client.Connect(ctx)
+ if err != nil {
+ return nil, err
+ }
+ openlogging.Info("connected to " + opts.URI)
+ }
+ return client, nil
+}
diff --git a/server/kv/options.go b/server/kv/options.go
new file mode 100644
index 0000000..aabe1ec
--- /dev/null
+++ b/server/kv/options.go
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kv
+
+import "github.com/apache/servicecomb-kie/pkg/model"
+
+type CallOptions struct {
+ ExactLabels bool
+ Key string
+ Labels model.Labels
+}
+
+type CallOption func(*CallOptions)
+
+//WithExactLabels tell model service to return only one kv matches the labels
+func WithExactLabels() CallOption {
+ return func(o *CallOptions) {
+ o.ExactLabels = true
+ }
+}
+
+//WithKey find by key
+func WithKey(key string) CallOption {
+ return func(o *CallOptions) {
+ o.Key = key
+ }
+}
+
+//WithLabels find kv by labels
+func WithLabels(labels model.Labels) CallOption {
+ return func(o *CallOptions) {
+ o.Labels = labels
+ }
+}