You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2018/01/29 08:19:23 UTC

[incubator-servicecomb-service-center] branch master updated: SCB-269 remove lock for dependency handle (#258)

This is an automated email from the ASF dual-hosted git repository.

littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f3030b  SCB-269 remove lock for dependency handle (#258)
2f3030b is described below

commit 2f3030bb7e8dd57e597823009f85fd9f57703180
Author: aseTo2016 <14...@qq.com>
AuthorDate: Mon Jan 29 16:19:21 2018 +0800

    SCB-269 remove lock for dependency handle (#258)
    
    * SCB-269 remove lock for dependency handle
    
    * SCB-269 remove lock for dependency handle
    
    * SCB-269 remove lock for dependency handle
    
    * SCB-269 remove lock for dependency handle
    
    * SCB-269 remove lock for dependency handle
---
 pkg/tree/tree.go                                 |  67 ++++++++++++
 pkg/tree/tree_test.go                            |  39 +++++++
 server/service/event/dependency_event_handler.go | 128 +++++++++--------------
 server/service/microservices.go                  |  11 +-
 server/service/service_dependency.go             |   2 +-
 server/service/service_dependency_test.go        |  17 ++-
 server/service/util/dependency.go                |  61 +++--------
 server/service/util/dependency_test.go           |   6 +-
 8 files changed, 188 insertions(+), 143 deletions(-)

diff --git a/pkg/tree/tree.go b/pkg/tree/tree.go
new file mode 100644
index 0000000..ca3136a
--- /dev/null
+++ b/pkg/tree/tree.go
@@ -0,0 +1,67 @@
+package tree
+
+//The tree is binary sort tree
+type tree struct {
+	root *Node
+	isAddToLeft func(node *Node, addRes interface{}) bool
+}
+
+func NewTree(isAddToLeft func(node *Node, addRes interface{}) bool) *tree {
+	return &tree{
+		isAddToLeft: isAddToLeft,
+	}
+}
+
+type Node struct {
+	Res         interface{}
+	left, right *Node
+}
+
+func (t *tree) GetRoot()*Node {
+	return t.root
+}
+
+//add res into tree
+func (t *tree) AddNode(res interface{}) *Node {
+	return t.addNode(t.root, res)
+}
+
+func (t *tree) addNode(n *Node, res interface{}) *Node{
+	if n == nil {
+		n = new(Node)
+		n.Res = res
+		if t.root == nil {
+			t.root = n
+		}
+		return n
+	}
+	if t.isAddToLeft(n, res) {
+		n.left = t.addNode(n.left, res)
+	} else {
+		n.right = t.addNode(n.right, res)
+	}
+	return n
+}
+
+//middle oder traversal, handle is the func that deals with the res, n is the start node to traversal
+func (t *tree) InOrderTraversal(n *Node, handle func(res interface{}) error) error {
+	if n == nil {
+		return nil
+	}
+
+	err := t.InOrderTraversal(n.left, handle)
+	if err != nil {
+		return err
+	}
+	err = handle(n.Res)
+	if err != nil {
+		return err
+	}
+	err = t.InOrderTraversal(n.right, handle)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+//todo add asynchronous handle handle func: go handle
\ No newline at end of file
diff --git a/pkg/tree/tree_test.go b/pkg/tree/tree_test.go
new file mode 100644
index 0000000..902b14a
--- /dev/null
+++ b/pkg/tree/tree_test.go
@@ -0,0 +1,39 @@
+package tree
+
+import (
+	"fmt"
+	"testing"
+	"reflect"
+)
+
+func TestTree(t *testing.T) {
+	compareFunc := func(node *Node, addRes interface{}) bool {
+		k := addRes.(int)
+		kCompare := node.Res.(int)
+		if k > kCompare {
+			return false
+		}
+		return true
+	}
+	testSlice := []int{6,3,7,2,4,5}
+	targetSlice := []int{2,3,4,5,6,7}
+	slice := testSlice[:0]
+	handle := func(res interface{}) error {
+		slice = append(slice, res.(int))
+		return nil
+	}
+
+
+	testTree := NewTree(compareFunc)
+
+	for _, v := range testSlice {
+		testTree.AddNode(v)
+	}
+
+
+	testTree.InOrderTraversal(testTree.GetRoot(), handle)
+	if !reflect.DeepEqual(slice, targetSlice) {
+		fmt.Printf(`TestTree failed`)
+		t.FailNow()
+	}
+}
\ No newline at end of file
diff --git a/server/service/event/dependency_event_handler.go b/server/service/event/dependency_event_handler.go
index d1bd311..67f2d46 100644
--- a/server/service/event/dependency_event_handler.go
+++ b/server/service/event/dependency_event_handler.go
@@ -30,6 +30,7 @@ import (
 	"github.com/coreos/etcd/mvcc/mvccpb"
 	"golang.org/x/net/context"
 	"time"
+	"github.com/apache/incubator-servicecomb-service-center/pkg/tree"
 )
 
 type DependencyEventHandler struct {
@@ -104,6 +105,15 @@ func NewDependencyEventHandlerResource(dep *pb.ConsumerDependency, kv *mvccpb.Ke
 	}
 }
 
+func isAddToLeft(centerNode *tree.Node, addRes interface{}) bool {
+	res := addRes.(*DependencyEventHandlerResource)
+	compareRes := centerNode.Res.(*DependencyEventHandlerResource)
+	if res.kv.ModRevision > compareRes.kv.ModRevision {
+		return false
+	}
+	return true
+}
+
 func (h *DependencyEventHandler) Handle() error {
 	key := core.GetServiceDependencyQueueRootKey("")
 	resp, err := store.Store().DependencyQueue().Search(context.Background(),
@@ -119,10 +129,10 @@ func (h *DependencyEventHandler) Handle() error {
 		return nil
 	}
 
-	lenKvs := len(resp.Kvs)
-	resourcesMap := make(map[string][]*DependencyEventHandlerResource, lenKvs)
-
 	ctx := context.Background()
+
+	dependencyTree := tree.NewTree(isAddToLeft)
+
 	for _, kv := range resp.Kvs {
 		r := &pb.ConsumerDependency{}
 		consumerId, domainProject, data := pb.GetInfoFromDependencyQueueKV(kv)
@@ -138,102 +148,58 @@ func (h *DependencyEventHandler) Handle() error {
 			continue
 		}
 
-		lockKey := serviceUtil.NewDependencyLockKey(domainProject, r.Consumer.Environment)
 		res := NewDependencyEventHandlerResource(r, kv, domainProject)
-		resources := resourcesMap[lockKey]
-		if resources == nil {
-			resources = make([]*DependencyEventHandlerResource, 0, lenKvs)
-		}
-		resources = append(resources, res)
-		resourcesMap[lockKey] = resources
-	}
 
-	dependencyRuleHandleResults := make(chan error, len(resourcesMap))
-	for lockKey, resources := range resourcesMap {
-		go func(lockKey string, resources []*DependencyEventHandlerResource) {
-			err := h.dependencyRuleHandle(ctx, lockKey, resources)
-			dependencyRuleHandleResults <- err
-		}(lockKey, resources)
-	}
-	var lastErr error
-	finishedCount := 0
-	for err := range dependencyRuleHandleResults {
-		finishedCount++
-		if err != nil {
-			lastErr = err
-		}
-		if finishedCount == len(resourcesMap) {
-			close(dependencyRuleHandleResults)
-		}
+		dependencyTree.AddNode(res)
 	}
-	return lastErr
+
+	return dependencyTree.InOrderTraversal(dependencyTree.GetRoot(), h.dependencyRuleHandle)
 }
 
-func (h *DependencyEventHandler) dependencyRuleHandle(ctx context.Context, lockKey string, resources []*DependencyEventHandlerResource) error {
-	lock, err := serviceUtil.DependencyLock(lockKey)
-	if err != nil {
-		util.Logger().Errorf(err, "create dependency rule locker failed")
-		return err
+func (h *DependencyEventHandler) dependencyRuleHandle(res interface{}) error {
+	ctx := context.Background()
+	dependencyEventHandlerRes := res.(*DependencyEventHandlerResource)
+	r := dependencyEventHandlerRes.dep
+	consumerFlag := util.StringJoin([]string{r.Consumer.AppId, r.Consumer.ServiceName, r.Consumer.Version}, "/")
+
+
+	domainProject := dependencyEventHandlerRes.domainProject
+	consumerInfo := pb.DependenciesToKeys([]*pb.MicroServiceKey{r.Consumer}, domainProject)[0]
+	providersInfo := pb.DependenciesToKeys(r.Providers, domainProject)
+
+	var dep serviceUtil.Dependency
+	var err error
+	dep.DomainProject = domainProject
+	dep.Consumer = consumerInfo
+	dep.ProvidersRule = providersInfo
+	if r.Override {
+		err = serviceUtil.CreateDependencyRule(ctx, &dep)
+	} else {
+		err = serviceUtil.AddDependencyRule(ctx, &dep)
 	}
-	defer lock.Unlock()
-	for _, res := range resources {
-		r := res.dep
-		consumerFlag := util.StringJoin([]string{r.Consumer.AppId, r.Consumer.ServiceName, r.Consumer.Version}, "/")
 
-		domainProject := res.domainProject
-		consumerInfo := pb.DependenciesToKeys([]*pb.MicroServiceKey{r.Consumer}, domainProject)[0]
-		providersInfo := pb.DependenciesToKeys(r.Providers, domainProject)
-
-		consumerId, err := serviceUtil.GetServiceId(ctx, consumerInfo)
-		if err != nil {
-			util.Logger().Errorf(err, "modify dependency rule failed, override: %t, consumer %s", r.Override, consumerFlag)
-			return fmt.Errorf("get consumer %s id failed, override: %t, %s", consumerFlag, r.Override, err.Error())
-		}
-		if len(consumerId) == 0 {
-			util.Logger().Errorf(nil, "maintain dependency failed, override: %t, consumer %s does not exist",
-				r.Override, consumerFlag)
-
-			if err = h.removeKV(ctx, res.kv); err != nil {
-				util.Logger().Errorf(err, "remove dependency rule failed, override: %t, consumer %s", r.Override, consumerFlag)
-				return err
-			}
-			continue
-		}
-
-		var dep serviceUtil.Dependency
-		dep.DomainProject = domainProject
-		dep.Consumer = consumerInfo
-		dep.ProvidersRule = providersInfo
-		dep.ConsumerId = consumerId
-		if r.Override {
-			err = serviceUtil.CreateDependencyRule(ctx, &dep)
-		} else {
-			err = serviceUtil.AddDependencyRule(ctx, &dep)
-		}
-
-		if err != nil {
-			util.Logger().Errorf(err, "modify dependency rule failed, override: %t, consumer %s", r.Override, consumerFlag)
-			return fmt.Errorf("override: %t, consumer is %s, %s", r.Override, consumerFlag, err.Error())
-		}
-
-		if err = h.removeKV(ctx, res.kv); err != nil {
-			util.Logger().Errorf(err, "remove dependency rule failed, override: %t, consumer %s", r.Override, consumerFlag)
-			return err
-		}
+	if err != nil {
+		util.Logger().Errorf(err, "modify dependency rule failed, override: %t, consumer %s", r.Override, consumerFlag)
+		return fmt.Errorf("override: %t, consumer is %s, %s", r.Override, consumerFlag, err.Error())
+	}
 
-		util.Logger().Infof("maintain dependency %v successfully, override: %t", r, r.Override)
+	if err = h.removeKV(ctx, dependencyEventHandlerRes.kv); err != nil {
+		util.Logger().Errorf(err, "remove dependency rule failed, override: %t, consumer %s", r.Override, consumerFlag)
+		return err
 	}
+
+	util.Logger().Infof("maintain dependency %v successfully, override: %t", r, r.Override)
 	return nil
 }
 
 func (h *DependencyEventHandler) removeKV(ctx context.Context, kv *mvccpb.KeyValue) error {
-	dresp, err := backend.Registry().TxnWithCmp(ctx, []registry.PluginOp{registry.OpDel(registry.WithKey(kv.Key))},
+	dResp, err := backend.Registry().TxnWithCmp(ctx, []registry.PluginOp{registry.OpDel(registry.WithKey(kv.Key))},
 		[]registry.CompareOp{registry.OpCmp(registry.CmpVer(kv.Key), registry.CMP_EQUAL, kv.Version)},
 		nil)
 	if err != nil {
 		return fmt.Errorf("can not remove the dependency %s request, %s", util.BytesToStringWithNoCopy(kv.Key), err.Error())
 	}
-	if !dresp.Succeeded {
+	if !dResp.Succeeded {
 		util.Logger().Infof("the dependency %s request is changed", util.BytesToStringWithNoCopy(kv.Key))
 	}
 	return nil
diff --git a/server/service/microservices.go b/server/service/microservices.go
index dcaa4be..60463a8 100644
--- a/server/service/microservices.go
+++ b/server/service/microservices.go
@@ -278,19 +278,12 @@ func (s *MicroServiceService) DeleteServicePri(ctx context.Context, serviceId st
 	}
 
 	//删除依赖规则
-	lock, err := serviceUtil.DependencyLock(serviceUtil.NewDependencyLockKey(domainProject, service.Environment))
-	if err != nil {
-		util.Logger().Errorf(err, "%s micro-service failed, serviceId is %s: inner err, create lock failed.", title, serviceId)
-		return pb.CreateResponse(scerr.ErrUnavailableBackend, err.Error()), err
-	}
-
-	defer lock.Unlock()
-	optsTmp, err := serviceUtil.DeleteDependencyForService(ctx, serviceKey)
+	optDeleteDep, err := serviceUtil.DeleteDependencyForDeleteService(domainProject, serviceId, serviceKey)
 	if err != nil {
 		util.Logger().Errorf(err, "%s micro-service failed, serviceId is %s: inner err, delete dependency failed.", title, serviceId)
 		return pb.CreateResponse(scerr.ErrInternal, err.Error()), err
 	}
-	opts = append(opts, optsTmp...)
+	opts = append(opts, optDeleteDep)
 
 	//删除黑白名单
 	opts = append(opts, registry.OpDel(
diff --git a/server/service/service_dependency.go b/server/service/service_dependency.go
index c5857f1..3e2d38f 100644
--- a/server/service/service_dependency.go
+++ b/server/service/service_dependency.go
@@ -51,7 +51,7 @@ func (s *MicroServiceService) AddOrUpdateDependencies(ctx context.Context, depen
 	opts := make([]registry.PluginOp, 0, len(dependencyInfos))
 	domainProject := util.ParseDomainProject(ctx)
 	for _, dependencyInfo := range dependencyInfos {
-		if len(dependencyInfo.Providers) == 0 || dependencyInfo.Consumer == nil {
+		if (len(dependencyInfo.Providers) == 0  && !override) || dependencyInfo.Consumer == nil {
 			return serviceUtil.BadParamsResponse("Provider is invalid").Response, nil
 		}
 
diff --git a/server/service/service_dependency_test.go b/server/service/service_dependency_test.go
index 719fd00..90803c5 100644
--- a/server/service/service_dependency_test.go
+++ b/server/service/service_dependency_test.go
@@ -26,7 +26,7 @@ import (
 var deh event.DependencyEventHandler
 
 var _ = Describe("'Dependency' service", func() {
-	Describe("execute 'create' operartion", func() {
+	Describe("execute 'create' operation", func() {
 		var (
 			consumerId1 string
 			consumerId2 string
@@ -283,12 +283,25 @@ var _ = Describe("'Dependency' service", func() {
 					Version:     "1.0.0",
 				}
 
-				By("add latest")
+				By("add provider is empty")
 				respCreateDependency, err := serviceResource.CreateDependenciesForMicroServices(getContext(), &pb.CreateDependenciesRequest{
 					Dependencies: []*pb.ConsumerDependency{
 						{
 							Consumer: consumer,
 							Providers: []*pb.MicroServiceKey{
+							},
+						},
+					},
+				})
+				Expect(err).To(BeNil())
+				Expect(respCreateDependency.Response.Code).To(Equal(pb.Response_SUCCESS))
+
+				By("add latest")
+				respCreateDependency, err = serviceResource.CreateDependenciesForMicroServices(getContext(), &pb.CreateDependenciesRequest{
+					Dependencies: []*pb.ConsumerDependency{
+						{
+							Consumer: consumer,
+							Providers: []*pb.MicroServiceKey{
 								{
 									AppId:       "create_dep_group",
 									ServiceName: "create_dep_provider",
diff --git a/server/service/util/dependency.go b/server/service/util/dependency.go
index 68c6bf7..3e9e929 100644
--- a/server/service/util/dependency.go
+++ b/server/service/util/dependency.go
@@ -21,7 +21,6 @@ import (
 	"errors"
 	"fmt"
 	"github.com/apache/incubator-servicecomb-service-center/pkg/cache"
-	"github.com/apache/incubator-servicecomb-service-center/pkg/etcdsync"
 	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
 	apt "github.com/apache/incubator-servicecomb-service-center/server/core"
 	"github.com/apache/incubator-servicecomb-service-center/server/core/backend"
@@ -29,7 +28,6 @@ import (
 	pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
 	scerr "github.com/apache/incubator-servicecomb-service-center/server/error"
 	"github.com/apache/incubator-servicecomb-service-center/server/infra/registry"
-	"github.com/apache/incubator-servicecomb-service-center/server/mux"
 	"golang.org/x/net/context"
 	"strings"
 	"time"
@@ -279,42 +277,6 @@ func AddServiceVersionRule(ctx context.Context, domainProject string, consumer *
 	return nil
 }
 
-func DeleteDependencyForService(ctx context.Context, service *pb.MicroServiceKey) ([]registry.PluginOp, error) {
-	domainProject := service.Tenant
-	//删除依赖规则
-	conKey := apt.GenerateConsumerDependencyRuleKey(domainProject, service)
-	providerValue, err := TransferToMicroServiceDependency(ctx, conKey)
-	if err != nil {
-		return nil, err
-	}
-	opts := make([]registry.PluginOp, 0)
-	if providerValue != nil && len(providerValue.Dependency) != 0 {
-		providerRuleKey := ""
-		for _, providerRule := range providerValue.Dependency {
-			providerRuleKey = apt.GenerateProviderDependencyRuleKey(domainProject, providerRule)
-			consumers, err := TransferToMicroServiceDependency(ctx, providerRuleKey)
-			if err != nil {
-				return nil, err
-			}
-			opt, err := updateProviderDependencyRuleUtil(consumers, service, providerRuleKey)
-			if err != nil {
-				return nil, err
-			}
-			opts = append(opts, opt)
-		}
-	}
-	util.Logger().Infof("delete dependency rule, consumer Key is %s.", conKey)
-	opts = append(opts, registry.OpDel(registry.WithStrKey(conKey)))
-
-	//作为provider的依赖规则
-	providerKey := apt.GenerateProviderDependencyRuleKey(domainProject, service)
-
-	util.Logger().Infof("delete dependency rule, providerKey is %s", providerKey)
-	opts = append(opts, registry.OpDel(registry.WithStrKey(providerKey)))
-
-	return opts, nil
-}
-
 func TransferToMicroServiceDependency(ctx context.Context, key string) (*pb.MicroServiceDependency, error) {
 	microServiceDependency := &pb.MicroServiceDependency{
 		Dependency: []*pb.MicroServiceKey{},
@@ -505,8 +467,8 @@ func CreateDependencyRule(ctx context.Context, dep *Dependency) error {
 }
 
 func isDependencyAll(dep *pb.MicroServiceDependency) bool {
-	for _, servicedep := range dep.Dependency {
-		if servicedep.ServiceName == "*" {
+	for _, serviceDep := range dep.Dependency {
+		if serviceDep.ServiceName == "*" {
 			return true
 		}
 	}
@@ -604,7 +566,7 @@ func validateMicroServiceKey(in *pb.MicroServiceKey, fuzzyMatch bool) error {
 }
 
 func BadParamsResponse(detailErr string) *pb.CreateDependenciesResponse {
-	util.Logger().Errorf(nil, "Request params is invalid.")
+	util.Logger().Errorf(nil, "Request params is invalid.%s", detailErr)
 	if len(detailErr) == 0 {
 		detailErr = "Request params is invalid."
 	}
@@ -1054,10 +1016,15 @@ func (dr *DependencyRelation) getConsumerOfSameServiceNameAndAppId(provider *pb.
 	return allConsumers, nil
 }
 
-func DependencyLock(lockKey string) (*etcdsync.DLock, error) {
-	return mux.Lock(mux.MuxType(lockKey))
-}
-
-func NewDependencyLockKey(domainProject, env string) string {
-	return util.StringJoin([]string{"", "env-lock", domainProject, env}, "/")
+func DeleteDependencyForDeleteService(domainProject string, serviceId string, service *pb.MicroServiceKey) (registry.PluginOp, error) {
+	key := apt.GenerateConsumerDependencyQueueKey(domainProject, serviceId, "0")
+	conDep := new(pb.ConsumerDependency)
+	conDep.Consumer = service
+	conDep.Providers = []*pb.MicroServiceKey{}
+	conDep.Override = true
+	data, err := json.Marshal(conDep)
+	if err != nil {
+		return registry.PluginOp{}, err
+	}
+	return registry.OpPut(registry.WithStrKey(key), registry.WithValue(data)), nil
 }
diff --git a/server/service/util/dependency_test.go b/server/service/util/dependency_test.go
index c1326e9..546459e 100644
--- a/server/service/util/dependency_test.go
+++ b/server/service/util/dependency_test.go
@@ -33,9 +33,9 @@ func TestRefreshDependencyCache(t *testing.T) {
 }
 
 func TestDeleteDependencyForService(t *testing.T) {
-	_, err := DeleteDependencyForService(context.Background(), &proto.MicroServiceKey{})
-	if err == nil {
-		fmt.Printf(`DeleteDependencyForService failed`)
+	_, err := DeleteDependencyForDeleteService("", "", &proto.MicroServiceKey{})
+	if err != nil {
+		fmt.Printf(`DeleteDependencyForDeleteService failed`)
 		t.FailNow()
 	}
 

-- 
To stop receiving notification emails like this one, please contact
littlecui@apache.org.