You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@dubbo.apache.org by GitBox <gi...@apache.org> on 2020/08/17 02:59:29 UTC

[GitHub] [dubbo-go] zouyx commented on a change in pull request #701: consul service discovery

zouyx commented on a change in pull request #701:
URL: https://github.com/apache/dubbo-go/pull/701#discussion_r471207092



##########
File path: registry/consul/service_discovery.go
##########
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package consul
+
+import (
+	"fmt"
+	"strconv"
+	"strings"
+	"sync"
+	"time"
+)
+
+import (
+	"github.com/dubbogo/gost/container/set"
+	"github.com/dubbogo/gost/page"
+	consul "github.com/hashicorp/consul/api"
+	"github.com/hashicorp/consul/api/watch"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/extension"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/config"
+	"github.com/apache/dubbo-go/registry"
+)
+
+const (
+	PageSize = "pageSize"
+	Enable   = "enable"
+)
+
+const (
+	CHECK_PASS_INTERVAL = "consul-check-pass-interval"
+	// default time-to-live in millisecond
+	DEFAULT_CHECK_PASS_INTERVAL = 16000
+	QUERY_TAG                   = "consul_query_tag"
+	ACL_TOKEN                   = "acl-token"
+	// default deregister critical server after
+	DEFAULT_DEREGISTER_TIME = "20s"
+	DEFAULT_WATCH_TIMEOUT   = 60 * 1000
+	WATCH_TIMEOUT           = "consul-watch-timeout"
+	DEREGISTER_AFTER        = "consul-deregister-critical-service-after"
+)
+
+var (
+	// 16 would be enough. We won't use concurrentMap because in most cases, there are not race condition
+	instanceMap = make(map[string]registry.ServiceDiscovery, 16)
+	initLock    sync.Mutex
+)
+
+// init will put the service discovery into extension
+func init() {
+	extension.SetServiceDiscovery(constant.CONSUL_KEY, newConsulServiceDiscovery)
+}
+
+// newConsulServiceDiscovery will create new service discovery instance
+// use double-check pattern to reduce race condition
+func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
+	instance, ok := instanceMap[name]
+	if ok {
+		return instance, nil
+	}
+
+	initLock.Lock()
+	defer initLock.Unlock()
+
+	// double check
+	instance, ok = instanceMap[name]
+	if ok {
+		return instance, nil
+	}
+
+	sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(name)
+	if !ok || len(sdc.RemoteRef) == 0 {
+		return nil, perrors.New("could not init the instance because the config is invalid")
+	}
+
+	remoteConfig, ok := config.GetBaseConfig().GetRemoteConfig(sdc.RemoteRef)
+	if !ok {
+		return nil, perrors.New("could not find the remote config for name: " + sdc.RemoteRef)
+	}
+
+	descriptor := fmt.Sprintf("consul-service-discovery[%s]", remoteConfig.Address)
+
+	pageSize := 20
+	if remoteConfig.Params != nil {
+		if tmp, OK := remoteConfig.Params[PageSize]; OK {
+			intTmp, err := strconv.Atoi(tmp)
+			if err == nil && intTmp > 20 {
+				pageSize = intTmp
+			}
+		}
+	}
+	return &consulServiceDiscovery{
+		address:    remoteConfig.Address,
+		descriptor: descriptor,
+		PageSize:   pageSize,
+		ttl:        make(map[string]chan struct{}),
+	}, nil
+}
+
+// nacosServiceDiscovery is the implementation of service discovery based on nacos.
+// There is a problem, the go client for nacos does not support the id field.
+// we will use the metadata to store the id of ServiceInstance
+type consulServiceDiscovery struct {
+	group string
+	// descriptor is a short string about the basic information of this instance
+	descriptor string
+	// Consul client.
+	consulClient      *consul.Client
+	PageSize          int
+	serviceUrl        common.URL
+	checkPassInterval int64
+	tag               string
+	tags              []string
+	address           string
+	ttl               map[string]chan struct{}
+	*consul.Config
+}
+
+func (csd *consulServiceDiscovery) Init(registryURL common.URL) error {
+	csd.serviceUrl = registryURL
+	csd.checkPassInterval = registryURL.GetParamInt(CHECK_PASS_INTERVAL, DEFAULT_CHECK_PASS_INTERVAL)
+	csd.tag = registryURL.GetParam(QUERY_TAG, "")
+	csd.tags = strings.Split(registryURL.GetParam("tags", ""), ",")
+	aclToken := registryURL.GetParam(ACL_TOKEN, "")
+	csd.Config = &consul.Config{Address: csd.address, Token: aclToken}
+	client, err := consul.NewClient(csd.Config)
+	if err != nil {
+		return perrors.WithMessage(err, "create consul client failed.")
+	}
+	csd.consulClient = client
+	return nil
+}
+
+func (csd *consulServiceDiscovery) String() string {
+	return csd.descriptor
+}
+
+func (csd *consulServiceDiscovery) Destroy() error {
+	csd.consulClient = nil
+	for _, t := range csd.ttl {
+		close(t)
+	}
+	csd.ttl = nil
+	return nil
+}
+
+func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) error {
+	ins, _ := csd.buildRegisterInstance(instance)
+	err := csd.consulClient.Agent().ServiceRegister(ins)
+	if err != nil {
+		return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName())
+	}
+
+	return csd.registerTtl(instance)
+}
+
+func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance) error {
+	checkID := buildID(instance)
+
+	stopChan := make(chan struct{})
+	csd.ttl[buildID(instance)] = stopChan
+
+	period := time.Duration(csd.checkPassInterval/8) * time.Millisecond
+	timer := time.NewTimer(period)
+	go func() {
+		for {
+			select {
+			case <-timer.C:
+				timer.Reset(period)
+				err := csd.consulClient.Agent().PassTTL(checkID, "")
+				if err != nil {
+					logger.Warnf("pass ttl heartbeat fail:%v", err)
+					break
+				}
+				logger.Debugf("passed ttl heartbeat for %s", checkID)
+				break
+			case <-stopChan:
+				logger.Info("ttl %s for service %s is stopped", checkID, instance.GetServiceName())
+				return
+			}
+		}
+	}()
+	return nil
+}
+
+func (csd *consulServiceDiscovery) Update(instance registry.ServiceInstance) error {
+	ins, _ := csd.buildRegisterInstance(instance)
+	err := csd.consulClient.Agent().ServiceDeregister(buildID(instance))
+	if err != nil {
+		logger.Warnf("unregister instance %s fail:%v", instance.GetServiceName(), err)
+	}
+	return csd.consulClient.Agent().ServiceRegister(ins)
+}
+
+func (csd *consulServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
+	err := csd.consulClient.Agent().ServiceDeregister(buildID(instance))
+	if err != nil {
+		logger.Errorf("unregister service instance %s,error: %v", instance.GetId(), err)
+		return err
+	}
+	stopChanel, ok := csd.ttl[buildID(instance)]
+	if ok {
+		close(stopChanel)
+		delete(csd.ttl, buildID(instance))
+	}

Review comment:
       ```suggestion
   	if !ok {
   		return error
   	}
   ```
   
   I think you should change to not ok , because previous if block is not ok...

##########
File path: registry/consul/service_discovery.go
##########
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package consul
+
+import (
+	"fmt"
+	"strconv"
+	"strings"
+	"sync"
+	"time"
+)
+
+import (
+	"github.com/dubbogo/gost/container/set"
+	"github.com/dubbogo/gost/page"
+	consul "github.com/hashicorp/consul/api"
+	"github.com/hashicorp/consul/api/watch"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/extension"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/config"
+	"github.com/apache/dubbo-go/registry"
+)
+
+const (
+	PageSize = "pageSize"
+	Enable   = "enable"
+)
+
+const (
+	CHECK_PASS_INTERVAL = "consul-check-pass-interval"
+	// default time-to-live in millisecond
+	DEFAULT_CHECK_PASS_INTERVAL = 16000
+	QUERY_TAG                   = "consul_query_tag"
+	ACL_TOKEN                   = "acl-token"
+	// default deregister critical server after
+	DEFAULT_DEREGISTER_TIME = "20s"
+	DEFAULT_WATCH_TIMEOUT   = 60 * 1000
+	WATCH_TIMEOUT           = "consul-watch-timeout"
+	DEREGISTER_AFTER        = "consul-deregister-critical-service-after"
+)

Review comment:
       what's this variable scope? If private, change  first char to lower case.

##########
File path: registry/consul/service_discovery.go
##########
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package consul
+
+import (
+	"fmt"
+	"strconv"
+	"strings"
+	"sync"
+	"time"
+)
+
+import (
+	"github.com/dubbogo/gost/container/set"
+	"github.com/dubbogo/gost/page"
+	consul "github.com/hashicorp/consul/api"
+	"github.com/hashicorp/consul/api/watch"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/extension"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/config"
+	"github.com/apache/dubbo-go/registry"
+)
+
+const (
+	PageSize = "pageSize"
+	Enable   = "enable"
+)
+
+const (
+	CHECK_PASS_INTERVAL = "consul-check-pass-interval"
+	// default time-to-live in millisecond
+	DEFAULT_CHECK_PASS_INTERVAL = 16000
+	QUERY_TAG                   = "consul_query_tag"
+	ACL_TOKEN                   = "acl-token"
+	// default deregister critical server after
+	DEFAULT_DEREGISTER_TIME = "20s"
+	DEFAULT_WATCH_TIMEOUT   = 60 * 1000
+	WATCH_TIMEOUT           = "consul-watch-timeout"
+	DEREGISTER_AFTER        = "consul-deregister-critical-service-after"
+)
+
+var (
+	// 16 would be enough. We won't use concurrentMap because in most cases, there are not race condition
+	instanceMap = make(map[string]registry.ServiceDiscovery, 16)
+	initLock    sync.Mutex
+)
+
+// init will put the service discovery into extension
+func init() {
+	extension.SetServiceDiscovery(constant.CONSUL_KEY, newConsulServiceDiscovery)
+}
+
+// newConsulServiceDiscovery will create new service discovery instance
+// use double-check pattern to reduce race condition
+func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
+	instance, ok := instanceMap[name]
+	if ok {
+		return instance, nil
+	}
+
+	initLock.Lock()
+	defer initLock.Unlock()
+
+	// double check
+	instance, ok = instanceMap[name]

Review comment:
       I can not find where do you set this instance to  `map`?
   Do i miss something?

##########
File path: registry/consul/service_discovery.go
##########
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package consul
+
+import (
+	"fmt"
+	"strconv"
+	"strings"
+	"sync"
+	"time"
+)
+
+import (
+	"github.com/dubbogo/gost/container/set"
+	"github.com/dubbogo/gost/page"
+	consul "github.com/hashicorp/consul/api"
+	"github.com/hashicorp/consul/api/watch"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/extension"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/config"
+	"github.com/apache/dubbo-go/registry"
+)
+
+const (
+	PageSize = "pageSize"
+	Enable   = "enable"
+)
+
+const (
+	CHECK_PASS_INTERVAL = "consul-check-pass-interval"
+	// default time-to-live in millisecond
+	DEFAULT_CHECK_PASS_INTERVAL = 16000
+	QUERY_TAG                   = "consul_query_tag"
+	ACL_TOKEN                   = "acl-token"
+	// default deregister critical server after
+	DEFAULT_DEREGISTER_TIME = "20s"
+	DEFAULT_WATCH_TIMEOUT   = 60 * 1000
+	WATCH_TIMEOUT           = "consul-watch-timeout"
+	DEREGISTER_AFTER        = "consul-deregister-critical-service-after"
+)
+
+var (
+	// 16 would be enough. We won't use concurrentMap because in most cases, there are not race condition
+	instanceMap = make(map[string]registry.ServiceDiscovery, 16)
+	initLock    sync.Mutex
+)
+
+// init will put the service discovery into extension
+func init() {
+	extension.SetServiceDiscovery(constant.CONSUL_KEY, newConsulServiceDiscovery)
+}
+
+// newConsulServiceDiscovery will create new service discovery instance
+// use double-check pattern to reduce race condition
+func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
+	instance, ok := instanceMap[name]
+	if ok {
+		return instance, nil
+	}
+
+	initLock.Lock()
+	defer initLock.Unlock()
+
+	// double check
+	instance, ok = instanceMap[name]
+	if ok {
+		return instance, nil
+	}
+
+	sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(name)
+	if !ok || len(sdc.RemoteRef) == 0 {
+		return nil, perrors.New("could not init the instance because the config is invalid")
+	}
+
+	remoteConfig, ok := config.GetBaseConfig().GetRemoteConfig(sdc.RemoteRef)
+	if !ok {
+		return nil, perrors.New("could not find the remote config for name: " + sdc.RemoteRef)
+	}
+
+	descriptor := fmt.Sprintf("consul-service-discovery[%s]", remoteConfig.Address)
+
+	pageSize := 20
+	if remoteConfig.Params != nil {
+		if tmp, OK := remoteConfig.Params[PageSize]; OK {
+			intTmp, err := strconv.Atoi(tmp)
+			if err == nil && intTmp > 20 {
+				pageSize = intTmp
+			}
+		}
+	}
+	return &consulServiceDiscovery{
+		address:    remoteConfig.Address,
+		descriptor: descriptor,
+		PageSize:   pageSize,
+		ttl:        make(map[string]chan struct{}),
+	}, nil
+}
+
+// nacosServiceDiscovery is the implementation of service discovery based on nacos.
+// There is a problem, the go client for nacos does not support the id field.
+// we will use the metadata to store the id of ServiceInstance
+type consulServiceDiscovery struct {
+	group string
+	// descriptor is a short string about the basic information of this instance
+	descriptor string
+	// Consul client.
+	consulClient      *consul.Client
+	PageSize          int
+	serviceUrl        common.URL
+	checkPassInterval int64
+	tag               string
+	tags              []string
+	address           string
+	ttl               map[string]chan struct{}
+	*consul.Config
+}
+
+func (csd *consulServiceDiscovery) Init(registryURL common.URL) error {
+	csd.serviceUrl = registryURL
+	csd.checkPassInterval = registryURL.GetParamInt(CHECK_PASS_INTERVAL, DEFAULT_CHECK_PASS_INTERVAL)
+	csd.tag = registryURL.GetParam(QUERY_TAG, "")
+	csd.tags = strings.Split(registryURL.GetParam("tags", ""), ",")
+	aclToken := registryURL.GetParam(ACL_TOKEN, "")
+	csd.Config = &consul.Config{Address: csd.address, Token: aclToken}
+	client, err := consul.NewClient(csd.Config)
+	if err != nil {
+		return perrors.WithMessage(err, "create consul client failed.")
+	}
+	csd.consulClient = client
+	return nil
+}
+
+func (csd *consulServiceDiscovery) String() string {
+	return csd.descriptor
+}
+
+func (csd *consulServiceDiscovery) Destroy() error {
+	csd.consulClient = nil
+	for _, t := range csd.ttl {
+		close(t)
+	}
+	csd.ttl = nil
+	return nil
+}
+
+func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) error {
+	ins, _ := csd.buildRegisterInstance(instance)
+	err := csd.consulClient.Agent().ServiceRegister(ins)
+	if err != nil {
+		return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName())
+	}
+
+	return csd.registerTtl(instance)
+}
+
+func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance) error {
+	checkID := buildID(instance)
+
+	stopChan := make(chan struct{})
+	csd.ttl[buildID(instance)] = stopChan
+
+	period := time.Duration(csd.checkPassInterval/8) * time.Millisecond
+	timer := time.NewTimer(period)
+	go func() {
+		for {
+			select {
+			case <-timer.C:
+				timer.Reset(period)
+				err := csd.consulClient.Agent().PassTTL(checkID, "")
+				if err != nil {
+					logger.Warnf("pass ttl heartbeat fail:%v", err)
+					break
+				}
+				logger.Debugf("passed ttl heartbeat for %s", checkID)
+				break
+			case <-stopChan:
+				logger.Info("ttl %s for service %s is stopped", checkID, instance.GetServiceName())
+				return
+			}
+		}
+	}()
+	return nil
+}
+
+func (csd *consulServiceDiscovery) Update(instance registry.ServiceInstance) error {
+	ins, _ := csd.buildRegisterInstance(instance)
+	err := csd.consulClient.Agent().ServiceDeregister(buildID(instance))
+	if err != nil {
+		logger.Warnf("unregister instance %s fail:%v", instance.GetServiceName(), err)

Review comment:
       Should you return error here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org