You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2018/01/29 08:19:23 UTC
[incubator-servicecomb-service-center] branch master updated:
SCB-269 remove lock for dependency handle (#258)
This is an automated email from the ASF dual-hosted git repository.
littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 2f3030b SCB-269 remove lock for dependency handle (#258)
2f3030b is described below
commit 2f3030bb7e8dd57e597823009f85fd9f57703180
Author: aseTo2016 <14...@qq.com>
AuthorDate: Mon Jan 29 16:19:21 2018 +0800
SCB-269 remove lock for dependency handle (#258)
* SCB-269 remove lock for dependency handle
* SCB-269 remove lock for dependency handle
* SCB-269 remove lock for dependency handle
* SCB-269 remove lock for dependency handle
* SCB-269 remove lock for dependency handle
---
pkg/tree/tree.go | 67 ++++++++++++
pkg/tree/tree_test.go | 39 +++++++
server/service/event/dependency_event_handler.go | 128 +++++++++--------------
server/service/microservices.go | 11 +-
server/service/service_dependency.go | 2 +-
server/service/service_dependency_test.go | 17 ++-
server/service/util/dependency.go | 61 +++--------
server/service/util/dependency_test.go | 6 +-
8 files changed, 188 insertions(+), 143 deletions(-)
diff --git a/pkg/tree/tree.go b/pkg/tree/tree.go
new file mode 100644
index 0000000..ca3136a
--- /dev/null
+++ b/pkg/tree/tree.go
@@ -0,0 +1,67 @@
+package tree
+
+//The tree is binary sort tree
+type tree struct {
+ root *Node
+ isAddToLeft func(node *Node, addRes interface{}) bool
+}
+
+func NewTree(isAddToLeft func(node *Node, addRes interface{}) bool) *tree {
+ return &tree{
+ isAddToLeft: isAddToLeft,
+ }
+}
+
+type Node struct {
+ Res interface{}
+ left, right *Node
+}
+
+func (t *tree) GetRoot()*Node {
+ return t.root
+}
+
+//add res into tree
+func (t *tree) AddNode(res interface{}) *Node {
+ return t.addNode(t.root, res)
+}
+
+func (t *tree) addNode(n *Node, res interface{}) *Node{
+ if n == nil {
+ n = new(Node)
+ n.Res = res
+ if t.root == nil {
+ t.root = n
+ }
+ return n
+ }
+ if t.isAddToLeft(n, res) {
+ n.left = t.addNode(n.left, res)
+ } else {
+ n.right = t.addNode(n.right, res)
+ }
+ return n
+}
+
+//middle oder traversal, handle is the func that deals with the res, n is the start node to traversal
+func (t *tree) InOrderTraversal(n *Node, handle func(res interface{}) error) error {
+ if n == nil {
+ return nil
+ }
+
+ err := t.InOrderTraversal(n.left, handle)
+ if err != nil {
+ return err
+ }
+ err = handle(n.Res)
+ if err != nil {
+ return err
+ }
+ err = t.InOrderTraversal(n.right, handle)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+//todo add asynchronous handle handle func: go handle
\ No newline at end of file
diff --git a/pkg/tree/tree_test.go b/pkg/tree/tree_test.go
new file mode 100644
index 0000000..902b14a
--- /dev/null
+++ b/pkg/tree/tree_test.go
@@ -0,0 +1,39 @@
+package tree
+
+import (
+ "fmt"
+ "testing"
+ "reflect"
+)
+
+func TestTree(t *testing.T) {
+ compareFunc := func(node *Node, addRes interface{}) bool {
+ k := addRes.(int)
+ kCompare := node.Res.(int)
+ if k > kCompare {
+ return false
+ }
+ return true
+ }
+ testSlice := []int{6,3,7,2,4,5}
+ targetSlice := []int{2,3,4,5,6,7}
+ slice := testSlice[:0]
+ handle := func(res interface{}) error {
+ slice = append(slice, res.(int))
+ return nil
+ }
+
+
+ testTree := NewTree(compareFunc)
+
+ for _, v := range testSlice {
+ testTree.AddNode(v)
+ }
+
+
+ testTree.InOrderTraversal(testTree.GetRoot(), handle)
+ if !reflect.DeepEqual(slice, targetSlice) {
+ fmt.Printf(`TestTree failed`)
+ t.FailNow()
+ }
+}
\ No newline at end of file
diff --git a/server/service/event/dependency_event_handler.go b/server/service/event/dependency_event_handler.go
index d1bd311..67f2d46 100644
--- a/server/service/event/dependency_event_handler.go
+++ b/server/service/event/dependency_event_handler.go
@@ -30,6 +30,7 @@ import (
"github.com/coreos/etcd/mvcc/mvccpb"
"golang.org/x/net/context"
"time"
+ "github.com/apache/incubator-servicecomb-service-center/pkg/tree"
)
type DependencyEventHandler struct {
@@ -104,6 +105,15 @@ func NewDependencyEventHandlerResource(dep *pb.ConsumerDependency, kv *mvccpb.Ke
}
}
+func isAddToLeft(centerNode *tree.Node, addRes interface{}) bool {
+ res := addRes.(*DependencyEventHandlerResource)
+ compareRes := centerNode.Res.(*DependencyEventHandlerResource)
+ if res.kv.ModRevision > compareRes.kv.ModRevision {
+ return false
+ }
+ return true
+}
+
func (h *DependencyEventHandler) Handle() error {
key := core.GetServiceDependencyQueueRootKey("")
resp, err := store.Store().DependencyQueue().Search(context.Background(),
@@ -119,10 +129,10 @@ func (h *DependencyEventHandler) Handle() error {
return nil
}
- lenKvs := len(resp.Kvs)
- resourcesMap := make(map[string][]*DependencyEventHandlerResource, lenKvs)
-
ctx := context.Background()
+
+ dependencyTree := tree.NewTree(isAddToLeft)
+
for _, kv := range resp.Kvs {
r := &pb.ConsumerDependency{}
consumerId, domainProject, data := pb.GetInfoFromDependencyQueueKV(kv)
@@ -138,102 +148,58 @@ func (h *DependencyEventHandler) Handle() error {
continue
}
- lockKey := serviceUtil.NewDependencyLockKey(domainProject, r.Consumer.Environment)
res := NewDependencyEventHandlerResource(r, kv, domainProject)
- resources := resourcesMap[lockKey]
- if resources == nil {
- resources = make([]*DependencyEventHandlerResource, 0, lenKvs)
- }
- resources = append(resources, res)
- resourcesMap[lockKey] = resources
- }
- dependencyRuleHandleResults := make(chan error, len(resourcesMap))
- for lockKey, resources := range resourcesMap {
- go func(lockKey string, resources []*DependencyEventHandlerResource) {
- err := h.dependencyRuleHandle(ctx, lockKey, resources)
- dependencyRuleHandleResults <- err
- }(lockKey, resources)
- }
- var lastErr error
- finishedCount := 0
- for err := range dependencyRuleHandleResults {
- finishedCount++
- if err != nil {
- lastErr = err
- }
- if finishedCount == len(resourcesMap) {
- close(dependencyRuleHandleResults)
- }
+ dependencyTree.AddNode(res)
}
- return lastErr
+
+ return dependencyTree.InOrderTraversal(dependencyTree.GetRoot(), h.dependencyRuleHandle)
}
-func (h *DependencyEventHandler) dependencyRuleHandle(ctx context.Context, lockKey string, resources []*DependencyEventHandlerResource) error {
- lock, err := serviceUtil.DependencyLock(lockKey)
- if err != nil {
- util.Logger().Errorf(err, "create dependency rule locker failed")
- return err
+func (h *DependencyEventHandler) dependencyRuleHandle(res interface{}) error {
+ ctx := context.Background()
+ dependencyEventHandlerRes := res.(*DependencyEventHandlerResource)
+ r := dependencyEventHandlerRes.dep
+ consumerFlag := util.StringJoin([]string{r.Consumer.AppId, r.Consumer.ServiceName, r.Consumer.Version}, "/")
+
+
+ domainProject := dependencyEventHandlerRes.domainProject
+ consumerInfo := pb.DependenciesToKeys([]*pb.MicroServiceKey{r.Consumer}, domainProject)[0]
+ providersInfo := pb.DependenciesToKeys(r.Providers, domainProject)
+
+ var dep serviceUtil.Dependency
+ var err error
+ dep.DomainProject = domainProject
+ dep.Consumer = consumerInfo
+ dep.ProvidersRule = providersInfo
+ if r.Override {
+ err = serviceUtil.CreateDependencyRule(ctx, &dep)
+ } else {
+ err = serviceUtil.AddDependencyRule(ctx, &dep)
}
- defer lock.Unlock()
- for _, res := range resources {
- r := res.dep
- consumerFlag := util.StringJoin([]string{r.Consumer.AppId, r.Consumer.ServiceName, r.Consumer.Version}, "/")
- domainProject := res.domainProject
- consumerInfo := pb.DependenciesToKeys([]*pb.MicroServiceKey{r.Consumer}, domainProject)[0]
- providersInfo := pb.DependenciesToKeys(r.Providers, domainProject)
-
- consumerId, err := serviceUtil.GetServiceId(ctx, consumerInfo)
- if err != nil {
- util.Logger().Errorf(err, "modify dependency rule failed, override: %t, consumer %s", r.Override, consumerFlag)
- return fmt.Errorf("get consumer %s id failed, override: %t, %s", consumerFlag, r.Override, err.Error())
- }
- if len(consumerId) == 0 {
- util.Logger().Errorf(nil, "maintain dependency failed, override: %t, consumer %s does not exist",
- r.Override, consumerFlag)
-
- if err = h.removeKV(ctx, res.kv); err != nil {
- util.Logger().Errorf(err, "remove dependency rule failed, override: %t, consumer %s", r.Override, consumerFlag)
- return err
- }
- continue
- }
-
- var dep serviceUtil.Dependency
- dep.DomainProject = domainProject
- dep.Consumer = consumerInfo
- dep.ProvidersRule = providersInfo
- dep.ConsumerId = consumerId
- if r.Override {
- err = serviceUtil.CreateDependencyRule(ctx, &dep)
- } else {
- err = serviceUtil.AddDependencyRule(ctx, &dep)
- }
-
- if err != nil {
- util.Logger().Errorf(err, "modify dependency rule failed, override: %t, consumer %s", r.Override, consumerFlag)
- return fmt.Errorf("override: %t, consumer is %s, %s", r.Override, consumerFlag, err.Error())
- }
-
- if err = h.removeKV(ctx, res.kv); err != nil {
- util.Logger().Errorf(err, "remove dependency rule failed, override: %t, consumer %s", r.Override, consumerFlag)
- return err
- }
+ if err != nil {
+ util.Logger().Errorf(err, "modify dependency rule failed, override: %t, consumer %s", r.Override, consumerFlag)
+ return fmt.Errorf("override: %t, consumer is %s, %s", r.Override, consumerFlag, err.Error())
+ }
- util.Logger().Infof("maintain dependency %v successfully, override: %t", r, r.Override)
+ if err = h.removeKV(ctx, dependencyEventHandlerRes.kv); err != nil {
+ util.Logger().Errorf(err, "remove dependency rule failed, override: %t, consumer %s", r.Override, consumerFlag)
+ return err
}
+
+ util.Logger().Infof("maintain dependency %v successfully, override: %t", r, r.Override)
return nil
}
func (h *DependencyEventHandler) removeKV(ctx context.Context, kv *mvccpb.KeyValue) error {
- dresp, err := backend.Registry().TxnWithCmp(ctx, []registry.PluginOp{registry.OpDel(registry.WithKey(kv.Key))},
+ dResp, err := backend.Registry().TxnWithCmp(ctx, []registry.PluginOp{registry.OpDel(registry.WithKey(kv.Key))},
[]registry.CompareOp{registry.OpCmp(registry.CmpVer(kv.Key), registry.CMP_EQUAL, kv.Version)},
nil)
if err != nil {
return fmt.Errorf("can not remove the dependency %s request, %s", util.BytesToStringWithNoCopy(kv.Key), err.Error())
}
- if !dresp.Succeeded {
+ if !dResp.Succeeded {
util.Logger().Infof("the dependency %s request is changed", util.BytesToStringWithNoCopy(kv.Key))
}
return nil
diff --git a/server/service/microservices.go b/server/service/microservices.go
index dcaa4be..60463a8 100644
--- a/server/service/microservices.go
+++ b/server/service/microservices.go
@@ -278,19 +278,12 @@ func (s *MicroServiceService) DeleteServicePri(ctx context.Context, serviceId st
}
//删除依赖规则
- lock, err := serviceUtil.DependencyLock(serviceUtil.NewDependencyLockKey(domainProject, service.Environment))
- if err != nil {
- util.Logger().Errorf(err, "%s micro-service failed, serviceId is %s: inner err, create lock failed.", title, serviceId)
- return pb.CreateResponse(scerr.ErrUnavailableBackend, err.Error()), err
- }
-
- defer lock.Unlock()
- optsTmp, err := serviceUtil.DeleteDependencyForService(ctx, serviceKey)
+ optDeleteDep, err := serviceUtil.DeleteDependencyForDeleteService(domainProject, serviceId, serviceKey)
if err != nil {
util.Logger().Errorf(err, "%s micro-service failed, serviceId is %s: inner err, delete dependency failed.", title, serviceId)
return pb.CreateResponse(scerr.ErrInternal, err.Error()), err
}
- opts = append(opts, optsTmp...)
+ opts = append(opts, optDeleteDep)
//删除黑白名单
opts = append(opts, registry.OpDel(
diff --git a/server/service/service_dependency.go b/server/service/service_dependency.go
index c5857f1..3e2d38f 100644
--- a/server/service/service_dependency.go
+++ b/server/service/service_dependency.go
@@ -51,7 +51,7 @@ func (s *MicroServiceService) AddOrUpdateDependencies(ctx context.Context, depen
opts := make([]registry.PluginOp, 0, len(dependencyInfos))
domainProject := util.ParseDomainProject(ctx)
for _, dependencyInfo := range dependencyInfos {
- if len(dependencyInfo.Providers) == 0 || dependencyInfo.Consumer == nil {
+ if (len(dependencyInfo.Providers) == 0 && !override) || dependencyInfo.Consumer == nil {
return serviceUtil.BadParamsResponse("Provider is invalid").Response, nil
}
diff --git a/server/service/service_dependency_test.go b/server/service/service_dependency_test.go
index 719fd00..90803c5 100644
--- a/server/service/service_dependency_test.go
+++ b/server/service/service_dependency_test.go
@@ -26,7 +26,7 @@ import (
var deh event.DependencyEventHandler
var _ = Describe("'Dependency' service", func() {
- Describe("execute 'create' operartion", func() {
+ Describe("execute 'create' operation", func() {
var (
consumerId1 string
consumerId2 string
@@ -283,12 +283,25 @@ var _ = Describe("'Dependency' service", func() {
Version: "1.0.0",
}
- By("add latest")
+ By("add provider is empty")
respCreateDependency, err := serviceResource.CreateDependenciesForMicroServices(getContext(), &pb.CreateDependenciesRequest{
Dependencies: []*pb.ConsumerDependency{
{
Consumer: consumer,
Providers: []*pb.MicroServiceKey{
+ },
+ },
+ },
+ })
+ Expect(err).To(BeNil())
+ Expect(respCreateDependency.Response.Code).To(Equal(pb.Response_SUCCESS))
+
+ By("add latest")
+ respCreateDependency, err = serviceResource.CreateDependenciesForMicroServices(getContext(), &pb.CreateDependenciesRequest{
+ Dependencies: []*pb.ConsumerDependency{
+ {
+ Consumer: consumer,
+ Providers: []*pb.MicroServiceKey{
{
AppId: "create_dep_group",
ServiceName: "create_dep_provider",
diff --git a/server/service/util/dependency.go b/server/service/util/dependency.go
index 68c6bf7..3e9e929 100644
--- a/server/service/util/dependency.go
+++ b/server/service/util/dependency.go
@@ -21,7 +21,6 @@ import (
"errors"
"fmt"
"github.com/apache/incubator-servicecomb-service-center/pkg/cache"
- "github.com/apache/incubator-servicecomb-service-center/pkg/etcdsync"
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
apt "github.com/apache/incubator-servicecomb-service-center/server/core"
"github.com/apache/incubator-servicecomb-service-center/server/core/backend"
@@ -29,7 +28,6 @@ import (
pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
scerr "github.com/apache/incubator-servicecomb-service-center/server/error"
"github.com/apache/incubator-servicecomb-service-center/server/infra/registry"
- "github.com/apache/incubator-servicecomb-service-center/server/mux"
"golang.org/x/net/context"
"strings"
"time"
@@ -279,42 +277,6 @@ func AddServiceVersionRule(ctx context.Context, domainProject string, consumer *
return nil
}
-func DeleteDependencyForService(ctx context.Context, service *pb.MicroServiceKey) ([]registry.PluginOp, error) {
- domainProject := service.Tenant
- //删除依赖规则
- conKey := apt.GenerateConsumerDependencyRuleKey(domainProject, service)
- providerValue, err := TransferToMicroServiceDependency(ctx, conKey)
- if err != nil {
- return nil, err
- }
- opts := make([]registry.PluginOp, 0)
- if providerValue != nil && len(providerValue.Dependency) != 0 {
- providerRuleKey := ""
- for _, providerRule := range providerValue.Dependency {
- providerRuleKey = apt.GenerateProviderDependencyRuleKey(domainProject, providerRule)
- consumers, err := TransferToMicroServiceDependency(ctx, providerRuleKey)
- if err != nil {
- return nil, err
- }
- opt, err := updateProviderDependencyRuleUtil(consumers, service, providerRuleKey)
- if err != nil {
- return nil, err
- }
- opts = append(opts, opt)
- }
- }
- util.Logger().Infof("delete dependency rule, consumer Key is %s.", conKey)
- opts = append(opts, registry.OpDel(registry.WithStrKey(conKey)))
-
- //作为provider的依赖规则
- providerKey := apt.GenerateProviderDependencyRuleKey(domainProject, service)
-
- util.Logger().Infof("delete dependency rule, providerKey is %s", providerKey)
- opts = append(opts, registry.OpDel(registry.WithStrKey(providerKey)))
-
- return opts, nil
-}
-
func TransferToMicroServiceDependency(ctx context.Context, key string) (*pb.MicroServiceDependency, error) {
microServiceDependency := &pb.MicroServiceDependency{
Dependency: []*pb.MicroServiceKey{},
@@ -505,8 +467,8 @@ func CreateDependencyRule(ctx context.Context, dep *Dependency) error {
}
func isDependencyAll(dep *pb.MicroServiceDependency) bool {
- for _, servicedep := range dep.Dependency {
- if servicedep.ServiceName == "*" {
+ for _, serviceDep := range dep.Dependency {
+ if serviceDep.ServiceName == "*" {
return true
}
}
@@ -604,7 +566,7 @@ func validateMicroServiceKey(in *pb.MicroServiceKey, fuzzyMatch bool) error {
}
func BadParamsResponse(detailErr string) *pb.CreateDependenciesResponse {
- util.Logger().Errorf(nil, "Request params is invalid.")
+ util.Logger().Errorf(nil, "Request params is invalid.%s", detailErr)
if len(detailErr) == 0 {
detailErr = "Request params is invalid."
}
@@ -1054,10 +1016,15 @@ func (dr *DependencyRelation) getConsumerOfSameServiceNameAndAppId(provider *pb.
return allConsumers, nil
}
-func DependencyLock(lockKey string) (*etcdsync.DLock, error) {
- return mux.Lock(mux.MuxType(lockKey))
-}
-
-func NewDependencyLockKey(domainProject, env string) string {
- return util.StringJoin([]string{"", "env-lock", domainProject, env}, "/")
+func DeleteDependencyForDeleteService(domainProject string, serviceId string, service *pb.MicroServiceKey) (registry.PluginOp, error) {
+ key := apt.GenerateConsumerDependencyQueueKey(domainProject, serviceId, "0")
+ conDep := new(pb.ConsumerDependency)
+ conDep.Consumer = service
+ conDep.Providers = []*pb.MicroServiceKey{}
+ conDep.Override = true
+ data, err := json.Marshal(conDep)
+ if err != nil {
+ return registry.PluginOp{}, err
+ }
+ return registry.OpPut(registry.WithStrKey(key), registry.WithValue(data)), nil
}
diff --git a/server/service/util/dependency_test.go b/server/service/util/dependency_test.go
index c1326e9..546459e 100644
--- a/server/service/util/dependency_test.go
+++ b/server/service/util/dependency_test.go
@@ -33,9 +33,9 @@ func TestRefreshDependencyCache(t *testing.T) {
}
func TestDeleteDependencyForService(t *testing.T) {
- _, err := DeleteDependencyForService(context.Background(), &proto.MicroServiceKey{})
- if err == nil {
- fmt.Printf(`DeleteDependencyForService failed`)
+ _, err := DeleteDependencyForDeleteService("", "", &proto.MicroServiceKey{})
+ if err != nil {
+ fmt.Printf(`DeleteDependencyForDeleteService failed`)
t.FailNow()
}
--
To stop receiving notification emails like this one, please contact
littlecui@apache.org.