You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2019/02/15 06:14:06 UTC

[servicecomb-service-center] branch master updated: Issues-531 add registration and discovery for client (#534)

This is an automated email from the ASF dual-hosted git repository.

littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new e1c525c  Issues-531 add registration and discovery for client (#534)
e1c525c is described below

commit e1c525cccfa020c9e65212326c76fc20df348cd6
Author: Chenzhu1008 <c5...@126.com>
AuthorDate: Fri Feb 15 14:14:01 2019 +0800

    Issues-531 add registration and discovery for client (#534)
    
    * add registration and discovery for client
    
    * adjust input parameters
---
 pkg/client/sc/instance.go     | 178 ++++++++++++++++++++++++++++++++++++++++++
 pkg/client/sc/microservice.go | 127 ++++++++++++++++++++++++++++++
 pkg/client/sc/watch.go        |  47 +++++++++++
 pkg/client/sc/websocket.go    |  39 +++++++++
 4 files changed, 391 insertions(+)

diff --git a/pkg/client/sc/instance.go b/pkg/client/sc/instance.go
new file mode 100644
index 0000000..96c7d31
--- /dev/null
+++ b/pkg/client/sc/instance.go
@@ -0,0 +1,178 @@
+package sc
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	"net/url"
+
+	"github.com/apache/servicecomb-service-center/server/core"
+	pb "github.com/apache/servicecomb-service-center/server/core/proto"
+	scerr "github.com/apache/servicecomb-service-center/server/error"
+)
+
+const (
+	apiDiscoveryInstancesURL = "/v4/%s/registry/instances"
+	apiHeartbeatSetURL       = "/v4/%s/registry/heartbeats"
+	apiInstanceHeartbeatURL  = "/v4/%s/registry/microservices/%s/instances/%s/heartbeat"
+)
+
+func (c *SCClient) RegisterInstance(ctx context.Context, domainProject, serviceId string, instance *pb.MicroServiceInstance) (string, *scerr.Error) {
+	domain, project := core.FromDomainProject(domainProject)
+	headers := c.CommonHeaders(ctx)
+	headers.Set("X-Domain-Name", domain)
+
+	reqBody, err := json.Marshal(&pb.RegisterInstanceRequest{Instance: instance})
+	if err != nil {
+		return "", scerr.NewError(scerr.ErrInternal, err.Error())
+	}
+
+	resp, err := c.RestDoWithContext(ctx, http.MethodPost,
+		fmt.Sprintf(apiInstancesURL, project, serviceId),
+		headers, reqBody)
+	if err != nil {
+		return "", scerr.NewError(scerr.ErrInternal, err.Error())
+	}
+	defer resp.Body.Close()
+
+	body, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		return "", scerr.NewError(scerr.ErrInternal, err.Error())
+	}
+
+	if resp.StatusCode != http.StatusOK {
+		return "", c.toError(body)
+	}
+
+	instancesResp := &pb.RegisterInstanceResponse{}
+	err = json.Unmarshal(body, instancesResp)
+	if err != nil {
+		return "", scerr.NewError(scerr.ErrInternal, err.Error())
+	}
+	return instancesResp.InstanceId, nil
+}
+
+func (c *SCClient) UnregisterInstance(ctx context.Context, domainProject, serviceId, instanceId string) *scerr.Error {
+	domain, project := core.FromDomainProject(domainProject)
+	headers := c.CommonHeaders(ctx)
+	headers.Set("X-Domain-Name", domain)
+
+	resp, err := c.RestDoWithContext(ctx, http.MethodDelete,
+		fmt.Sprintf(apiInstanceURL, project, serviceId, instanceId),
+		headers, nil)
+	if err != nil {
+		return scerr.NewError(scerr.ErrInternal, err.Error())
+	}
+	defer resp.Body.Close()
+
+	body, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		return scerr.NewError(scerr.ErrInternal, err.Error())
+	}
+
+	if resp.StatusCode != http.StatusOK {
+		return c.toError(body)
+	}
+
+	return nil
+}
+
+func (c *SCClient) DiscoveryInstances(ctx context.Context, domainProject, consumerId, providerAppId, providerServiceName, providerVersionRule string) ([]*pb.MicroServiceInstance, *scerr.Error) {
+	domain, project := core.FromDomainProject(domainProject)
+	headers := c.CommonHeaders(ctx)
+	headers.Set("X-Domain-Name", domain)
+	headers.Set("X-ConsumerId", consumerId)
+
+	query := url.Values{}
+	query.Set("appId", providerAppId)
+	query.Set("serviceName", providerServiceName)
+	query.Set("version", providerVersionRule)
+
+	resp, err := c.RestDoWithContext(ctx, http.MethodGet,
+		fmt.Sprintf(apiDiscoveryInstancesURL, project)+"?"+c.parseQuery(ctx)+"&"+query.Encode(),
+		headers, nil)
+	if err != nil {
+		return nil, scerr.NewError(scerr.ErrInternal, err.Error())
+	}
+	defer resp.Body.Close()
+
+	body, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		return nil, scerr.NewError(scerr.ErrInternal, err.Error())
+	}
+
+	if resp.StatusCode != http.StatusOK {
+		return nil, c.toError(body)
+	}
+
+	instancesResp := &pb.GetInstancesResponse{}
+	err = json.Unmarshal(body, instancesResp)
+	if err != nil {
+		return nil, scerr.NewError(scerr.ErrInternal, err.Error())
+	}
+
+	return instancesResp.Instances, nil
+}
+
+func (c *SCClient) Heartbeat(ctx context.Context, domainProject, serviceId, instanceId string) *scerr.Error {
+	domain, project := core.FromDomainProject(domainProject)
+	headers := c.CommonHeaders(ctx)
+	headers.Set("X-Domain-Name", domain)
+
+	resp, err := c.RestDoWithContext(ctx, http.MethodPut,
+		fmt.Sprintf(apiInstanceHeartbeatURL, project, serviceId, instanceId),
+		headers, nil)
+	if err != nil {
+		return scerr.NewError(scerr.ErrInternal, err.Error())
+	}
+	defer resp.Body.Close()
+
+	body, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		return scerr.NewError(scerr.ErrInternal, err.Error())
+	}
+
+	if resp.StatusCode != http.StatusOK {
+		return c.toError(body)
+	}
+
+	return nil
+}
+
+func (c *SCClient) HeartbeatSet(ctx context.Context, domainProject string, instances ...*pb.HeartbeatSetElement) ([]*pb.InstanceHbRst, *scerr.Error) {
+	domain, project := core.FromDomainProject(domainProject)
+	headers := c.CommonHeaders(ctx)
+	headers.Set("X-Domain-Name", domain)
+
+	reqBody, err := json.Marshal(&pb.HeartbeatSetRequest{Instances: instances})
+	if err != nil {
+		return nil, scerr.NewError(scerr.ErrInternal, err.Error())
+	}
+
+	resp, err := c.RestDoWithContext(ctx, http.MethodPut,
+		fmt.Sprintf(apiHeartbeatSetURL, project),
+		headers, reqBody)
+	if err != nil {
+		return nil, scerr.NewError(scerr.ErrInternal, err.Error())
+	}
+	defer resp.Body.Close()
+
+	body, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		return nil, scerr.NewError(scerr.ErrInternal, err.Error())
+	}
+
+	if resp.StatusCode != http.StatusOK {
+		return nil, c.toError(body)
+	}
+
+	instancesResp := &pb.HeartbeatSetResponse{}
+	err = json.Unmarshal(body, instancesResp)
+	if err != nil {
+		return nil, scerr.NewError(scerr.ErrInternal, err.Error())
+	}
+
+	return instancesResp.Instances, nil
+}
diff --git a/pkg/client/sc/microservice.go b/pkg/client/sc/microservice.go
new file mode 100644
index 0000000..05a2db2
--- /dev/null
+++ b/pkg/client/sc/microservice.go
@@ -0,0 +1,127 @@
+package sc
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	"net/url"
+
+	"github.com/apache/servicecomb-service-center/server/core"
+	pb "github.com/apache/servicecomb-service-center/server/core/proto"
+	scerr "github.com/apache/servicecomb-service-center/server/error"
+)
+
+const (
+	apiExistenceURL     = "/v4/%s/registry/existence"
+	apiMicroServicesURL = "/v4/%s/registry/microservices"
+	apiMicroServiceURL  = "/v4/%s/registry/microservices/%s"
+)
+
+func (c *SCClient) CreateService(ctx context.Context, domainProject string, service *pb.MicroService) (string, *scerr.Error) {
+	domain, project := core.FromDomainProject(domainProject)
+	headers := c.CommonHeaders(ctx)
+	headers.Set("X-Domain-Name", domain)
+
+	reqBody, err := json.Marshal(&pb.CreateServiceRequest{Service: service})
+	if err != nil {
+		return "", scerr.NewError(scerr.ErrInternal, err.Error())
+	}
+
+	resp, err := c.RestDoWithContext(ctx, http.MethodPost,
+		fmt.Sprintf(apiMicroServicesURL, project),
+		headers, reqBody)
+	if err != nil {
+		return "", scerr.NewError(scerr.ErrInternal, err.Error())
+	}
+	defer resp.Body.Close()
+
+	body, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		return "", scerr.NewError(scerr.ErrInternal, err.Error())
+	}
+
+	if resp.StatusCode != http.StatusOK {
+		return "", c.toError(body)
+	}
+
+	serviceResp := &pb.CreateServiceResponse{}
+	err = json.Unmarshal(body, serviceResp)
+	if err != nil {
+		return "", scerr.NewError(scerr.ErrInternal, err.Error())
+	}
+	return serviceResp.ServiceId, nil
+}
+
+func (c *SCClient) DeleteService(ctx context.Context, domainProject, serviceId string) *scerr.Error {
+	domain, project := core.FromDomainProject(domainProject)
+	headers := c.CommonHeaders(ctx)
+	headers.Set("X-Domain-Name", domain)
+
+	resp, err := c.RestDoWithContext(ctx, http.MethodDelete,
+		fmt.Sprintf(apiMicroServiceURL, project, serviceId),
+		headers, nil)
+	if err != nil {
+		return scerr.NewError(scerr.ErrInternal, err.Error())
+	}
+	defer resp.Body.Close()
+
+	body, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		return scerr.NewError(scerr.ErrInternal, err.Error())
+	}
+
+	if resp.StatusCode != http.StatusOK {
+		return c.toError(body)
+	}
+
+	return nil
+}
+
+func (c *SCClient) ServiceExistence(ctx context.Context, domainProject string, appId, serviceName, versionRule, env string) (string, *scerr.Error) {
+	query := url.Values{}
+	query.Set("type", "microservice")
+	query.Set("env", env)
+	query.Set("appId", appId)
+	query.Set("serviceName", serviceName)
+	query.Set("version", versionRule)
+
+	resp, err := c.existence(ctx, domainProject, query)
+	if err != nil {
+		return "", err
+	}
+
+	return resp.ServiceId, nil
+}
+
+func (c *SCClient) existence(ctx context.Context, domainProject string, query url.Values) (*pb.GetExistenceResponse, *scerr.Error) {
+	domain, project := core.FromDomainProject(domainProject)
+	headers := c.CommonHeaders(ctx)
+	headers.Set("X-Domain-Name", domain)
+
+	resp, err := c.RestDoWithContext(ctx, http.MethodGet,
+		fmt.Sprintf(apiExistenceURL, project)+"?"+c.parseQuery(ctx)+"&"+query.Encode(),
+		headers, nil)
+	if err != nil {
+		return nil, scerr.NewError(scerr.ErrInternal, err.Error())
+	}
+	defer resp.Body.Close()
+
+	body, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		return nil, scerr.NewError(scerr.ErrInternal, err.Error())
+	}
+
+	if resp.StatusCode != http.StatusOK {
+		return nil, c.toError(body)
+	}
+
+	existenceResp := &pb.GetExistenceResponse{}
+	err = json.Unmarshal(body, existenceResp)
+	if err != nil {
+		return nil, scerr.NewError(scerr.ErrInternal, err.Error())
+	}
+
+	return existenceResp, nil
+}
diff --git a/pkg/client/sc/watch.go b/pkg/client/sc/watch.go
new file mode 100644
index 0000000..15af3e8
--- /dev/null
+++ b/pkg/client/sc/watch.go
@@ -0,0 +1,47 @@
+package sc
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"log"
+
+	"github.com/apache/servicecomb-service-center/server/core"
+	pb "github.com/apache/servicecomb-service-center/server/core/proto"
+	scerr "github.com/apache/servicecomb-service-center/server/error"
+	"github.com/gorilla/websocket"
+)
+
+const (
+	apiWatcherURL        = "/v4/%s/registry/microservices/%s/watcher"
+	apiListAndWatcherURL = "/v4/%s/registry/microservices/%s/listwatcher"
+)
+
+func (c *SCClient) Watch(ctx context.Context, domainProject, selfServiceId string, callback func(*pb.WatchInstanceResponse)) *scerr.Error {
+	domain, project := core.FromDomainProject(domainProject)
+	headers := c.CommonHeaders(ctx)
+	headers.Set("X-Domain-Name", domain)
+
+	conn, err := c.WebsocketDial(ctx, fmt.Sprintf(apiWatcherURL, project, selfServiceId), headers)
+	if err != nil {
+		return scerr.NewError(scerr.ErrInternal, err.Error())
+	}
+
+	for {
+		messageType, message, err := conn.ReadMessage()
+		if err != nil {
+			log.Println(err)
+			break
+		}
+		if messageType == websocket.TextMessage {
+			data := &pb.WatchInstanceResponse{}
+			err := json.Unmarshal(message, data)
+			if err != nil {
+				log.Println(err)
+				break
+			}
+			callback(data)
+		}
+	}
+	return scerr.NewError(scerr.ErrInternal, err.Error())
+}
diff --git a/pkg/client/sc/websocket.go b/pkg/client/sc/websocket.go
new file mode 100644
index 0000000..2b7f68e
--- /dev/null
+++ b/pkg/client/sc/websocket.go
@@ -0,0 +1,39 @@
+package sc
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"github.com/apache/servicecomb-service-center/pkg/util"
+	"net/http"
+	"net/url"
+
+	"github.com/gorilla/websocket"
+)
+
+func (c *LBClient) WebsocketDial(ctx context.Context, api string, headers http.Header) (conn *websocket.Conn, err error) {
+	dialer := &websocket.Dialer{TLSClientConfig: c.TLS}
+	var errs []string
+	for i := 0; i < c.Retries; i++ {
+		var addr *url.URL
+		addr, err = url.Parse(c.Next())
+		if err != nil {
+			errs = append(errs, fmt.Sprintf("[%s]: %s", addr, err.Error()))
+			continue
+		}
+		if addr.Scheme == "https" {
+			addr.Scheme = "wss"
+		} else {
+			addr.Scheme = "ws"
+		}
+		conn, _, err = dialer.Dial(addr.String() + api, headers)
+		if err == nil {
+			break
+		}
+		errs = append(errs, fmt.Sprintf("[%s]: %s", addr, err.Error()))
+	}
+	if err != nil {
+		err = errors.New(util.StringJoin(errs, ", "))
+	}
+	return
+}