You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@dubbo.apache.org by GitBox <gi...@apache.org> on 2020/08/02 13:23:55 UTC

[GitHub] [dubbo-go] zouyx commented on a change in pull request #665: Ftr: dynamic tag router

zouyx commented on a change in pull request #665:
URL: https://github.com/apache/dubbo-go/pull/665#discussion_r464077311



##########
File path: cluster/router/tag/router_rule.go
##########
@@ -22,9 +22,27 @@ import (
 	"github.com/apache/dubbo-go/common/yaml"
 )
 
+/**
+ * %YAML1.2
+ * ---
+ * force: true
+ * runtime: false
+ * enabled: true
+ * priority: 1
+ * key: demo-provider
+ * tags:
+ * - name: tag1
+ * addresses: [ip1, ip2]
+ * - name: tag2
+ * addresses: [ip3, ip4]
+ * ...
+ */
 // RouterRule RouterRule config read from config file or config center
 type RouterRule struct {
 	router.BaseRouterRule `yaml:",inline""`
+	Tags                  []Tag

Review comment:
       ```suggestion
   	tags                  []Tag
   ```
   you mean `tags`?

##########
File path: cluster/router/tag/tag_router.go
##########
@@ -63,7 +83,152 @@ func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocati
 	if len(invokers) == 0 {
 		return invokers
 	}
-	return filterUsingStaticTag(invokers, url, invocation)
+	if c.tagRouterRule == nil || !c.tagRouterRule.Valid || !c.tagRouterRule.Enabled {
+		return filterUsingStaticTag(invokers, url, invocation)
+	}
+	// since the rule can be changed by config center, we should copy one to use.
+	tagRouterRuleCopy := c.tagRouterRuleCopy()
+	tag, ok := invocation.Attachments()[constant.Tagkey]
+	if !ok {
+		tag = url.GetParam(constant.Tagkey, "")
+	}
+	var (
+		result    []protocol.Invoker
+		addresses []string
+	)
+	if tag != "" {

Review comment:
       ```suggestion
   	if len(tag)>0  {
   ```

##########
File path: cluster/router/tag/tag_router.go
##########
@@ -100,3 +265,163 @@ func isForceUseTag(url *common.URL, invocation protocol.Invocation) bool {
 	}
 	return false
 }
+
+type filter func(protocol.Invoker) bool
+
+func filterInvoker(invokers []protocol.Invoker, filters ...filter) []protocol.Invoker {
+	var res []protocol.Invoker
+OUTER:
+	for _, invoker := range invokers {
+		for _, filter := range filters {
+			if !filter(invoker) {
+				continue OUTER
+			}
+		}
+		res = append(res, invoker)
+	}
+	return res
+}
+
+// TODO 需要搬到 dubbogo/gost, 可以先 review

Review comment:
       Use english?

##########
File path: cluster/router/tag/tag_router.go
##########
@@ -55,6 +65,16 @@ func (c *tagRouter) isEnabled() bool {
 	return c.enabled
 }
 
+func (c *tagRouter) SetApplication(app string) {
+	c.application = app
+}
+
+func (c *tagRouter) tagRouterRuleCopy() RouterRule {
+	fmt.Println(c.tagRouterRule, "fuck")

Review comment:
       ```suggestion
   	fmt.Println(c.tagRouterRule, "fuck")
   ```
   i think this feature has made you crazy :)

##########
File path: cluster/router/tag/tag.go
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package tag
+
+type Tag struct {

Review comment:
       you mean `tag`?
   ```suggestion
   type tag struct {
   ```

##########
File path: cluster/router/tag/tag_router.go
##########
@@ -63,7 +83,152 @@ func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocati
 	if len(invokers) == 0 {
 		return invokers
 	}
-	return filterUsingStaticTag(invokers, url, invocation)
+	if c.tagRouterRule == nil || !c.tagRouterRule.Valid || !c.tagRouterRule.Enabled {
+		return filterUsingStaticTag(invokers, url, invocation)
+	}
+	// since the rule can be changed by config center, we should copy one to use.
+	tagRouterRuleCopy := c.tagRouterRuleCopy()
+	tag, ok := invocation.Attachments()[constant.Tagkey]
+	if !ok {
+		tag = url.GetParam(constant.Tagkey, "")
+	}
+	var (
+		result    []protocol.Invoker
+		addresses []string
+	)
+	if tag != "" {
+		addresses, _ = tagRouterRuleCopy.getTagNameToAddresses()[tag]
+		// filter by dynamic tag group first
+		if len(addresses) > 0 {
+			filterAddressMatches := func(invoker protocol.Invoker) bool {
+				url := invoker.GetUrl()
+				if len(addresses) > 0 && checkAddressMatch(addresses, url.Ip, url.Port) {
+					return true
+				}
+				return false
+			}
+			result = filterInvoker(invokers, filterAddressMatches)
+			if len(result) > 0 || tagRouterRuleCopy.Force {
+				return result
+			}
+		} else {
+			// dynamic tag group doesn't have any item about the requested app OR it's null after filtered by
+			// dynamic tag group but force=false. check static tag
+			filter := func(invoker protocol.Invoker) bool {
+				if invoker.GetUrl().GetParam(constant.Tagkey, "") == tag {
+					return true
+				}
+				return false
+			}
+			result = filterInvoker(invokers, filter)
+		}
+		// If there's no tagged providers that can match the current tagged request. force.tag is set by default
+		// to false, which means it will invoke any providers without a tag unless it's explicitly disallowed.
+		if len(result) > 0 || isForceUseTag(url, invocation) {
+			return result
+		} else {
+			// FAILOVER: return all Providers without any tags.
+			filterAddressNotMatches := func(invoker protocol.Invoker) bool {
+				url := invoker.GetUrl()
+				if len(addresses) == 0 || !checkAddressMatch(tagRouterRuleCopy.getAddresses(), url.Ip, url.Port) {
+					return true
+				}
+				return false
+			}
+			filterTagIsEmpty := func(invoker protocol.Invoker) bool {
+				if invoker.GetUrl().GetParam(constant.Tagkey, "") == "" {
+					return true
+				}
+				return false
+			}
+			return filterInvoker(invokers, filterAddressNotMatches, filterTagIsEmpty)
+		}
+	} else {
+		// return all addresses in dynamic tag group.
+		addresses = tagRouterRuleCopy.getAddresses()
+		if len(addresses) > 0 {
+			filterAddressNotMatches := func(invoker protocol.Invoker) bool {
+				url := invoker.GetUrl()
+				if len(addresses) == 0 || !checkAddressMatch(addresses, url.Ip, url.Port) {
+					return true
+				}
+				return false
+			}
+			result = filterInvoker(invokers, filterAddressNotMatches)
+			// 1. all addresses are in dynamic tag group, return empty list.
+			if len(result) == 0 {
+				return result
+			}
+			// 2. if there are some addresses that are not in any dynamic tag group, continue to filter using the
+			// static tag group.
+		}
+		filter := func(invoker protocol.Invoker) bool {
+			localTag := invoker.GetUrl().GetParam(constant.Tagkey, "")
+			return localTag == "" || !(tagRouterRuleCopy.hasTag(localTag))
+		}
+		return filterInvoker(result, filter)
+	}
+}
+
+func (c *tagRouter) Process(event *config_center.ConfigChangeEvent) {
+	logger.Infof("Notification of tag rule, change type is:[%s] , raw rule is:[%v]", event.ConfigType, event.Value)
+	if remoting.EventTypeDel == event.ConfigType {
+		c.tagRouterRule = nil
+		return
+	} else {
+		content, ok := event.Value.(string)
+		if !ok {
+			msg := fmt.Sprintf("Convert event content fail,raw content:[%s] ", event.Value)
+			logger.Error(msg)
+			return
+		}
+
+		routerRule, err := getRule(content)
+		if err != nil {
+			logger.Errorf("Parse dynamic tag router rule fail,error:[%s] ", err)
+			return
+		}
+		c.tagRouterRule = routerRule
+		return
+	}
+}
+
+func (c *tagRouter) Notify(invokers []protocol.Invoker) {
+	if len(invokers) == 0 {
+		return
+	}
+	invoker := invokers[0]
+	url := invoker.GetUrl()
+	providerApplication := url.GetParam(constant.RemoteApplicationKey, "")
+	if providerApplication == "" {
+		logger.Error("TagRouter must getConfig from or subscribe to a specific application, but the application " +
+			"in this TagRouter is not specified.")
+		return
+	}
+	dynamicConfiguration := config.GetEnvInstance().GetDynamicConfiguration()
+	if dynamicConfiguration == nil {
+		logger.Error("Get dynamicConfiguration fail, dynamicConfiguration is nil, init config center plugin please")
+		return
+	}
+
+	if providerApplication != c.application {
+		dynamicConfiguration.RemoveListener(c.application+constant.TagRouterRuleSuffix, c)
+	}
+
+	routerKey := providerApplication + constant.TagRouterRuleSuffix
+	dynamicConfiguration.AddListener(routerKey, c)
+	//get rule
+	rule, err := dynamicConfiguration.GetRule(routerKey, config_center.WithGroup(config_center.DEFAULT_GROUP))
+	if len(rule) == 0 || err != nil {
+		logger.Errorf("Get rule fail, config rule{%s},  error{%v}", rule, err)
+		return
+	}
+	if rule != "" {

Review comment:
       len>0

##########
File path: cluster/router/tag/tag_router.go
##########
@@ -63,7 +83,152 @@ func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocati
 	if len(invokers) == 0 {
 		return invokers
 	}
-	return filterUsingStaticTag(invokers, url, invocation)
+	if c.tagRouterRule == nil || !c.tagRouterRule.Valid || !c.tagRouterRule.Enabled {
+		return filterUsingStaticTag(invokers, url, invocation)
+	}
+	// since the rule can be changed by config center, we should copy one to use.
+	tagRouterRuleCopy := c.tagRouterRuleCopy()
+	tag, ok := invocation.Attachments()[constant.Tagkey]
+	if !ok {
+		tag = url.GetParam(constant.Tagkey, "")
+	}
+	var (
+		result    []protocol.Invoker
+		addresses []string
+	)
+	if tag != "" {
+		addresses, _ = tagRouterRuleCopy.getTagNameToAddresses()[tag]
+		// filter by dynamic tag group first
+		if len(addresses) > 0 {
+			filterAddressMatches := func(invoker protocol.Invoker) bool {
+				url := invoker.GetUrl()
+				if len(addresses) > 0 && checkAddressMatch(addresses, url.Ip, url.Port) {
+					return true
+				}
+				return false
+			}
+			result = filterInvoker(invokers, filterAddressMatches)
+			if len(result) > 0 || tagRouterRuleCopy.Force {
+				return result
+			}
+		} else {
+			// dynamic tag group doesn't have any item about the requested app OR it's null after filtered by
+			// dynamic tag group but force=false. check static tag
+			filter := func(invoker protocol.Invoker) bool {
+				if invoker.GetUrl().GetParam(constant.Tagkey, "") == tag {
+					return true
+				}
+				return false
+			}
+			result = filterInvoker(invokers, filter)
+		}
+		// If there's no tagged providers that can match the current tagged request. force.tag is set by default
+		// to false, which means it will invoke any providers without a tag unless it's explicitly disallowed.
+		if len(result) > 0 || isForceUseTag(url, invocation) {
+			return result
+		} else {
+			// FAILOVER: return all Providers without any tags.
+			filterAddressNotMatches := func(invoker protocol.Invoker) bool {
+				url := invoker.GetUrl()
+				if len(addresses) == 0 || !checkAddressMatch(tagRouterRuleCopy.getAddresses(), url.Ip, url.Port) {
+					return true
+				}
+				return false
+			}
+			filterTagIsEmpty := func(invoker protocol.Invoker) bool {
+				if invoker.GetUrl().GetParam(constant.Tagkey, "") == "" {
+					return true
+				}
+				return false
+			}
+			return filterInvoker(invokers, filterAddressNotMatches, filterTagIsEmpty)
+		}
+	} else {
+		// return all addresses in dynamic tag group.
+		addresses = tagRouterRuleCopy.getAddresses()
+		if len(addresses) > 0 {
+			filterAddressNotMatches := func(invoker protocol.Invoker) bool {
+				url := invoker.GetUrl()
+				if len(addresses) == 0 || !checkAddressMatch(addresses, url.Ip, url.Port) {
+					return true
+				}
+				return false
+			}
+			result = filterInvoker(invokers, filterAddressNotMatches)
+			// 1. all addresses are in dynamic tag group, return empty list.
+			if len(result) == 0 {
+				return result
+			}
+			// 2. if there are some addresses that are not in any dynamic tag group, continue to filter using the
+			// static tag group.
+		}
+		filter := func(invoker protocol.Invoker) bool {
+			localTag := invoker.GetUrl().GetParam(constant.Tagkey, "")
+			return localTag == "" || !(tagRouterRuleCopy.hasTag(localTag))
+		}
+		return filterInvoker(result, filter)
+	}
+}
+
+func (c *tagRouter) Process(event *config_center.ConfigChangeEvent) {
+	logger.Infof("Notification of tag rule, change type is:[%s] , raw rule is:[%v]", event.ConfigType, event.Value)
+	if remoting.EventTypeDel == event.ConfigType {
+		c.tagRouterRule = nil
+		return
+	} else {
+		content, ok := event.Value.(string)
+		if !ok {
+			msg := fmt.Sprintf("Convert event content fail,raw content:[%s] ", event.Value)
+			logger.Error(msg)

Review comment:
       Use logger.Errorf ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org