You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@servicecomb.apache.org by GitBox <gi...@apache.org> on 2018/01/29 08:19:25 UTC

[GitHub] little-cui closed pull request #258: SCB-269 remove lock for dependency handle

little-cui closed pull request #258: SCB-269 remove lock for dependency handle
URL: https://github.com/apache/incubator-servicecomb-service-center/pull/258
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pkg/tree/tree.go b/pkg/tree/tree.go
new file mode 100644
index 00000000..ca3136ac
--- /dev/null
+++ b/pkg/tree/tree.go
@@ -0,0 +1,67 @@
+package tree
+
+//The tree is binary sort tree
+type tree struct {
+	root *Node
+	isAddToLeft func(node *Node, addRes interface{}) bool
+}
+
+func NewTree(isAddToLeft func(node *Node, addRes interface{}) bool) *tree {
+	return &tree{
+		isAddToLeft: isAddToLeft,
+	}
+}
+
+type Node struct {
+	Res         interface{}
+	left, right *Node
+}
+
+func (t *tree) GetRoot()*Node {
+	return t.root
+}
+
+//add res into tree
+func (t *tree) AddNode(res interface{}) *Node {
+	return t.addNode(t.root, res)
+}
+
+func (t *tree) addNode(n *Node, res interface{}) *Node{
+	if n == nil {
+		n = new(Node)
+		n.Res = res
+		if t.root == nil {
+			t.root = n
+		}
+		return n
+	}
+	if t.isAddToLeft(n, res) {
+		n.left = t.addNode(n.left, res)
+	} else {
+		n.right = t.addNode(n.right, res)
+	}
+	return n
+}
+
+//middle oder traversal, handle is the func that deals with the res, n is the start node to traversal
+func (t *tree) InOrderTraversal(n *Node, handle func(res interface{}) error) error {
+	if n == nil {
+		return nil
+	}
+
+	err := t.InOrderTraversal(n.left, handle)
+	if err != nil {
+		return err
+	}
+	err = handle(n.Res)
+	if err != nil {
+		return err
+	}
+	err = t.InOrderTraversal(n.right, handle)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+//todo add asynchronous handle handle func: go handle
\ No newline at end of file
diff --git a/pkg/tree/tree_test.go b/pkg/tree/tree_test.go
new file mode 100644
index 00000000..902b14a4
--- /dev/null
+++ b/pkg/tree/tree_test.go
@@ -0,0 +1,39 @@
+package tree
+
+import (
+	"fmt"
+	"testing"
+	"reflect"
+)
+
+func TestTree(t *testing.T) {
+	compareFunc := func(node *Node, addRes interface{}) bool {
+		k := addRes.(int)
+		kCompare := node.Res.(int)
+		if k > kCompare {
+			return false
+		}
+		return true
+	}
+	testSlice := []int{6,3,7,2,4,5}
+	targetSlice := []int{2,3,4,5,6,7}
+	slice := testSlice[:0]
+	handle := func(res interface{}) error {
+		slice = append(slice, res.(int))
+		return nil
+	}
+
+
+	testTree := NewTree(compareFunc)
+
+	for _, v := range testSlice {
+		testTree.AddNode(v)
+	}
+
+
+	testTree.InOrderTraversal(testTree.GetRoot(), handle)
+	if !reflect.DeepEqual(slice, targetSlice) {
+		fmt.Printf(`TestTree failed`)
+		t.FailNow()
+	}
+}
\ No newline at end of file
diff --git a/server/service/event/dependency_event_handler.go b/server/service/event/dependency_event_handler.go
index d1bd3118..67f2d465 100644
--- a/server/service/event/dependency_event_handler.go
+++ b/server/service/event/dependency_event_handler.go
@@ -30,6 +30,7 @@ import (
 	"github.com/coreos/etcd/mvcc/mvccpb"
 	"golang.org/x/net/context"
 	"time"
+	"github.com/apache/incubator-servicecomb-service-center/pkg/tree"
 )
 
 type DependencyEventHandler struct {
@@ -104,6 +105,15 @@ func NewDependencyEventHandlerResource(dep *pb.ConsumerDependency, kv *mvccpb.Ke
 	}
 }
 
+func isAddToLeft(centerNode *tree.Node, addRes interface{}) bool {
+	res := addRes.(*DependencyEventHandlerResource)
+	compareRes := centerNode.Res.(*DependencyEventHandlerResource)
+	if res.kv.ModRevision > compareRes.kv.ModRevision {
+		return false
+	}
+	return true
+}
+
 func (h *DependencyEventHandler) Handle() error {
 	key := core.GetServiceDependencyQueueRootKey("")
 	resp, err := store.Store().DependencyQueue().Search(context.Background(),
@@ -119,10 +129,10 @@ func (h *DependencyEventHandler) Handle() error {
 		return nil
 	}
 
-	lenKvs := len(resp.Kvs)
-	resourcesMap := make(map[string][]*DependencyEventHandlerResource, lenKvs)
-
 	ctx := context.Background()
+
+	dependencyTree := tree.NewTree(isAddToLeft)
+
 	for _, kv := range resp.Kvs {
 		r := &pb.ConsumerDependency{}
 		consumerId, domainProject, data := pb.GetInfoFromDependencyQueueKV(kv)
@@ -138,102 +148,58 @@ func (h *DependencyEventHandler) Handle() error {
 			continue
 		}
 
-		lockKey := serviceUtil.NewDependencyLockKey(domainProject, r.Consumer.Environment)
 		res := NewDependencyEventHandlerResource(r, kv, domainProject)
-		resources := resourcesMap[lockKey]
-		if resources == nil {
-			resources = make([]*DependencyEventHandlerResource, 0, lenKvs)
-		}
-		resources = append(resources, res)
-		resourcesMap[lockKey] = resources
-	}
 
-	dependencyRuleHandleResults := make(chan error, len(resourcesMap))
-	for lockKey, resources := range resourcesMap {
-		go func(lockKey string, resources []*DependencyEventHandlerResource) {
-			err := h.dependencyRuleHandle(ctx, lockKey, resources)
-			dependencyRuleHandleResults <- err
-		}(lockKey, resources)
-	}
-	var lastErr error
-	finishedCount := 0
-	for err := range dependencyRuleHandleResults {
-		finishedCount++
-		if err != nil {
-			lastErr = err
-		}
-		if finishedCount == len(resourcesMap) {
-			close(dependencyRuleHandleResults)
-		}
+		dependencyTree.AddNode(res)
 	}
-	return lastErr
+
+	return dependencyTree.InOrderTraversal(dependencyTree.GetRoot(), h.dependencyRuleHandle)
 }
 
-func (h *DependencyEventHandler) dependencyRuleHandle(ctx context.Context, lockKey string, resources []*DependencyEventHandlerResource) error {
-	lock, err := serviceUtil.DependencyLock(lockKey)
-	if err != nil {
-		util.Logger().Errorf(err, "create dependency rule locker failed")
-		return err
+func (h *DependencyEventHandler) dependencyRuleHandle(res interface{}) error {
+	ctx := context.Background()
+	dependencyEventHandlerRes := res.(*DependencyEventHandlerResource)
+	r := dependencyEventHandlerRes.dep
+	consumerFlag := util.StringJoin([]string{r.Consumer.AppId, r.Consumer.ServiceName, r.Consumer.Version}, "/")
+
+
+	domainProject := dependencyEventHandlerRes.domainProject
+	consumerInfo := pb.DependenciesToKeys([]*pb.MicroServiceKey{r.Consumer}, domainProject)[0]
+	providersInfo := pb.DependenciesToKeys(r.Providers, domainProject)
+
+	var dep serviceUtil.Dependency
+	var err error
+	dep.DomainProject = domainProject
+	dep.Consumer = consumerInfo
+	dep.ProvidersRule = providersInfo
+	if r.Override {
+		err = serviceUtil.CreateDependencyRule(ctx, &dep)
+	} else {
+		err = serviceUtil.AddDependencyRule(ctx, &dep)
 	}
-	defer lock.Unlock()
-	for _, res := range resources {
-		r := res.dep
-		consumerFlag := util.StringJoin([]string{r.Consumer.AppId, r.Consumer.ServiceName, r.Consumer.Version}, "/")
 
-		domainProject := res.domainProject
-		consumerInfo := pb.DependenciesToKeys([]*pb.MicroServiceKey{r.Consumer}, domainProject)[0]
-		providersInfo := pb.DependenciesToKeys(r.Providers, domainProject)
-
-		consumerId, err := serviceUtil.GetServiceId(ctx, consumerInfo)
-		if err != nil {
-			util.Logger().Errorf(err, "modify dependency rule failed, override: %t, consumer %s", r.Override, consumerFlag)
-			return fmt.Errorf("get consumer %s id failed, override: %t, %s", consumerFlag, r.Override, err.Error())
-		}
-		if len(consumerId) == 0 {
-			util.Logger().Errorf(nil, "maintain dependency failed, override: %t, consumer %s does not exist",
-				r.Override, consumerFlag)
-
-			if err = h.removeKV(ctx, res.kv); err != nil {
-				util.Logger().Errorf(err, "remove dependency rule failed, override: %t, consumer %s", r.Override, consumerFlag)
-				return err
-			}
-			continue
-		}
-
-		var dep serviceUtil.Dependency
-		dep.DomainProject = domainProject
-		dep.Consumer = consumerInfo
-		dep.ProvidersRule = providersInfo
-		dep.ConsumerId = consumerId
-		if r.Override {
-			err = serviceUtil.CreateDependencyRule(ctx, &dep)
-		} else {
-			err = serviceUtil.AddDependencyRule(ctx, &dep)
-		}
-
-		if err != nil {
-			util.Logger().Errorf(err, "modify dependency rule failed, override: %t, consumer %s", r.Override, consumerFlag)
-			return fmt.Errorf("override: %t, consumer is %s, %s", r.Override, consumerFlag, err.Error())
-		}
-
-		if err = h.removeKV(ctx, res.kv); err != nil {
-			util.Logger().Errorf(err, "remove dependency rule failed, override: %t, consumer %s", r.Override, consumerFlag)
-			return err
-		}
+	if err != nil {
+		util.Logger().Errorf(err, "modify dependency rule failed, override: %t, consumer %s", r.Override, consumerFlag)
+		return fmt.Errorf("override: %t, consumer is %s, %s", r.Override, consumerFlag, err.Error())
+	}
 
-		util.Logger().Infof("maintain dependency %v successfully, override: %t", r, r.Override)
+	if err = h.removeKV(ctx, dependencyEventHandlerRes.kv); err != nil {
+		util.Logger().Errorf(err, "remove dependency rule failed, override: %t, consumer %s", r.Override, consumerFlag)
+		return err
 	}
+
+	util.Logger().Infof("maintain dependency %v successfully, override: %t", r, r.Override)
 	return nil
 }
 
 func (h *DependencyEventHandler) removeKV(ctx context.Context, kv *mvccpb.KeyValue) error {
-	dresp, err := backend.Registry().TxnWithCmp(ctx, []registry.PluginOp{registry.OpDel(registry.WithKey(kv.Key))},
+	dResp, err := backend.Registry().TxnWithCmp(ctx, []registry.PluginOp{registry.OpDel(registry.WithKey(kv.Key))},
 		[]registry.CompareOp{registry.OpCmp(registry.CmpVer(kv.Key), registry.CMP_EQUAL, kv.Version)},
 		nil)
 	if err != nil {
 		return fmt.Errorf("can not remove the dependency %s request, %s", util.BytesToStringWithNoCopy(kv.Key), err.Error())
 	}
-	if !dresp.Succeeded {
+	if !dResp.Succeeded {
 		util.Logger().Infof("the dependency %s request is changed", util.BytesToStringWithNoCopy(kv.Key))
 	}
 	return nil
diff --git a/server/service/microservices.go b/server/service/microservices.go
index dcaa4be6..60463a88 100644
--- a/server/service/microservices.go
+++ b/server/service/microservices.go
@@ -278,19 +278,12 @@ func (s *MicroServiceService) DeleteServicePri(ctx context.Context, serviceId st
 	}
 
 	//??????
-	lock, err := serviceUtil.DependencyLock(serviceUtil.NewDependencyLockKey(domainProject, service.Environment))
-	if err != nil {
-		util.Logger().Errorf(err, "%s micro-service failed, serviceId is %s: inner err, create lock failed.", title, serviceId)
-		return pb.CreateResponse(scerr.ErrUnavailableBackend, err.Error()), err
-	}
-
-	defer lock.Unlock()
-	optsTmp, err := serviceUtil.DeleteDependencyForService(ctx, serviceKey)
+	optDeleteDep, err := serviceUtil.DeleteDependencyForDeleteService(domainProject, serviceId, serviceKey)
 	if err != nil {
 		util.Logger().Errorf(err, "%s micro-service failed, serviceId is %s: inner err, delete dependency failed.", title, serviceId)
 		return pb.CreateResponse(scerr.ErrInternal, err.Error()), err
 	}
-	opts = append(opts, optsTmp...)
+	opts = append(opts, optDeleteDep)
 
 	//??????
 	opts = append(opts, registry.OpDel(
diff --git a/server/service/service_dependency.go b/server/service/service_dependency.go
index c5857f1f..3e2d38f9 100644
--- a/server/service/service_dependency.go
+++ b/server/service/service_dependency.go
@@ -51,7 +51,7 @@ func (s *MicroServiceService) AddOrUpdateDependencies(ctx context.Context, depen
 	opts := make([]registry.PluginOp, 0, len(dependencyInfos))
 	domainProject := util.ParseDomainProject(ctx)
 	for _, dependencyInfo := range dependencyInfos {
-		if len(dependencyInfo.Providers) == 0 || dependencyInfo.Consumer == nil {
+		if (len(dependencyInfo.Providers) == 0  && !override) || dependencyInfo.Consumer == nil {
 			return serviceUtil.BadParamsResponse("Provider is invalid").Response, nil
 		}
 
diff --git a/server/service/service_dependency_test.go b/server/service/service_dependency_test.go
index 719fd00b..90803c50 100644
--- a/server/service/service_dependency_test.go
+++ b/server/service/service_dependency_test.go
@@ -26,7 +26,7 @@ import (
 var deh event.DependencyEventHandler
 
 var _ = Describe("'Dependency' service", func() {
-	Describe("execute 'create' operartion", func() {
+	Describe("execute 'create' operation", func() {
 		var (
 			consumerId1 string
 			consumerId2 string
@@ -283,8 +283,21 @@ var _ = Describe("'Dependency' service", func() {
 					Version:     "1.0.0",
 				}
 
-				By("add latest")
+				By("add provider is empty")
 				respCreateDependency, err := serviceResource.CreateDependenciesForMicroServices(getContext(), &pb.CreateDependenciesRequest{
+					Dependencies: []*pb.ConsumerDependency{
+						{
+							Consumer: consumer,
+							Providers: []*pb.MicroServiceKey{
+							},
+						},
+					},
+				})
+				Expect(err).To(BeNil())
+				Expect(respCreateDependency.Response.Code).To(Equal(pb.Response_SUCCESS))
+
+				By("add latest")
+				respCreateDependency, err = serviceResource.CreateDependenciesForMicroServices(getContext(), &pb.CreateDependenciesRequest{
 					Dependencies: []*pb.ConsumerDependency{
 						{
 							Consumer: consumer,
diff --git a/server/service/util/dependency.go b/server/service/util/dependency.go
index 68c6bf71..3e9e9290 100644
--- a/server/service/util/dependency.go
+++ b/server/service/util/dependency.go
@@ -21,7 +21,6 @@ import (
 	"errors"
 	"fmt"
 	"github.com/apache/incubator-servicecomb-service-center/pkg/cache"
-	"github.com/apache/incubator-servicecomb-service-center/pkg/etcdsync"
 	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
 	apt "github.com/apache/incubator-servicecomb-service-center/server/core"
 	"github.com/apache/incubator-servicecomb-service-center/server/core/backend"
@@ -29,7 +28,6 @@ import (
 	pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
 	scerr "github.com/apache/incubator-servicecomb-service-center/server/error"
 	"github.com/apache/incubator-servicecomb-service-center/server/infra/registry"
-	"github.com/apache/incubator-servicecomb-service-center/server/mux"
 	"golang.org/x/net/context"
 	"strings"
 	"time"
@@ -279,42 +277,6 @@ func AddServiceVersionRule(ctx context.Context, domainProject string, consumer *
 	return nil
 }
 
-func DeleteDependencyForService(ctx context.Context, service *pb.MicroServiceKey) ([]registry.PluginOp, error) {
-	domainProject := service.Tenant
-	//??????
-	conKey := apt.GenerateConsumerDependencyRuleKey(domainProject, service)
-	providerValue, err := TransferToMicroServiceDependency(ctx, conKey)
-	if err != nil {
-		return nil, err
-	}
-	opts := make([]registry.PluginOp, 0)
-	if providerValue != nil && len(providerValue.Dependency) != 0 {
-		providerRuleKey := ""
-		for _, providerRule := range providerValue.Dependency {
-			providerRuleKey = apt.GenerateProviderDependencyRuleKey(domainProject, providerRule)
-			consumers, err := TransferToMicroServiceDependency(ctx, providerRuleKey)
-			if err != nil {
-				return nil, err
-			}
-			opt, err := updateProviderDependencyRuleUtil(consumers, service, providerRuleKey)
-			if err != nil {
-				return nil, err
-			}
-			opts = append(opts, opt)
-		}
-	}
-	util.Logger().Infof("delete dependency rule, consumer Key is %s.", conKey)
-	opts = append(opts, registry.OpDel(registry.WithStrKey(conKey)))
-
-	//??provider?????
-	providerKey := apt.GenerateProviderDependencyRuleKey(domainProject, service)
-
-	util.Logger().Infof("delete dependency rule, providerKey is %s", providerKey)
-	opts = append(opts, registry.OpDel(registry.WithStrKey(providerKey)))
-
-	return opts, nil
-}
-
 func TransferToMicroServiceDependency(ctx context.Context, key string) (*pb.MicroServiceDependency, error) {
 	microServiceDependency := &pb.MicroServiceDependency{
 		Dependency: []*pb.MicroServiceKey{},
@@ -505,8 +467,8 @@ func CreateDependencyRule(ctx context.Context, dep *Dependency) error {
 }
 
 func isDependencyAll(dep *pb.MicroServiceDependency) bool {
-	for _, servicedep := range dep.Dependency {
-		if servicedep.ServiceName == "*" {
+	for _, serviceDep := range dep.Dependency {
+		if serviceDep.ServiceName == "*" {
 			return true
 		}
 	}
@@ -604,7 +566,7 @@ func validateMicroServiceKey(in *pb.MicroServiceKey, fuzzyMatch bool) error {
 }
 
 func BadParamsResponse(detailErr string) *pb.CreateDependenciesResponse {
-	util.Logger().Errorf(nil, "Request params is invalid.")
+	util.Logger().Errorf(nil, "Request params is invalid.%s", detailErr)
 	if len(detailErr) == 0 {
 		detailErr = "Request params is invalid."
 	}
@@ -1054,10 +1016,15 @@ func (dr *DependencyRelation) getConsumerOfSameServiceNameAndAppId(provider *pb.
 	return allConsumers, nil
 }
 
-func DependencyLock(lockKey string) (*etcdsync.DLock, error) {
-	return mux.Lock(mux.MuxType(lockKey))
-}
-
-func NewDependencyLockKey(domainProject, env string) string {
-	return util.StringJoin([]string{"", "env-lock", domainProject, env}, "/")
+func DeleteDependencyForDeleteService(domainProject string, serviceId string, service *pb.MicroServiceKey) (registry.PluginOp, error) {
+	key := apt.GenerateConsumerDependencyQueueKey(domainProject, serviceId, "0")
+	conDep := new(pb.ConsumerDependency)
+	conDep.Consumer = service
+	conDep.Providers = []*pb.MicroServiceKey{}
+	conDep.Override = true
+	data, err := json.Marshal(conDep)
+	if err != nil {
+		return registry.PluginOp{}, err
+	}
+	return registry.OpPut(registry.WithStrKey(key), registry.WithValue(data)), nil
 }
diff --git a/server/service/util/dependency_test.go b/server/service/util/dependency_test.go
index c1326e9f..546459e0 100644
--- a/server/service/util/dependency_test.go
+++ b/server/service/util/dependency_test.go
@@ -33,9 +33,9 @@ func TestRefreshDependencyCache(t *testing.T) {
 }
 
 func TestDeleteDependencyForService(t *testing.T) {
-	_, err := DeleteDependencyForService(context.Background(), &proto.MicroServiceKey{})
-	if err == nil {
-		fmt.Printf(`DeleteDependencyForService failed`)
+	_, err := DeleteDependencyForDeleteService("", "", &proto.MicroServiceKey{})
+	if err != nil {
+		fmt.Printf(`DeleteDependencyForDeleteService failed`)
 		t.FailNow()
 	}
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services