You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2019/02/15 06:14:06 UTC
[servicecomb-service-center] branch master updated: Issues-531 add
registration and discovery for client (#534)
This is an automated email from the ASF dual-hosted git repository.
littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new e1c525c Issues-531 add registration and discovery for client (#534)
e1c525c is described below
commit e1c525cccfa020c9e65212326c76fc20df348cd6
Author: Chenzhu1008 <c5...@126.com>
AuthorDate: Fri Feb 15 14:14:01 2019 +0800
Issues-531 add registration and discovery for client (#534)
* add registration and discovery for client
* adjust input parameters
---
pkg/client/sc/instance.go | 178 ++++++++++++++++++++++++++++++++++++++++++
pkg/client/sc/microservice.go | 127 ++++++++++++++++++++++++++++++
pkg/client/sc/watch.go | 47 +++++++++++
pkg/client/sc/websocket.go | 39 +++++++++
4 files changed, 391 insertions(+)
diff --git a/pkg/client/sc/instance.go b/pkg/client/sc/instance.go
new file mode 100644
index 0000000..96c7d31
--- /dev/null
+++ b/pkg/client/sc/instance.go
@@ -0,0 +1,178 @@
+package sc
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "net/url"
+
+ "github.com/apache/servicecomb-service-center/server/core"
+ pb "github.com/apache/servicecomb-service-center/server/core/proto"
+ scerr "github.com/apache/servicecomb-service-center/server/error"
+)
+
+const (
+ apiDiscoveryInstancesURL = "/v4/%s/registry/instances"
+ apiHeartbeatSetURL = "/v4/%s/registry/heartbeats"
+ apiInstanceHeartbeatURL = "/v4/%s/registry/microservices/%s/instances/%s/heartbeat"
+)
+
+func (c *SCClient) RegisterInstance(ctx context.Context, domainProject, serviceId string, instance *pb.MicroServiceInstance) (string, *scerr.Error) {
+ domain, project := core.FromDomainProject(domainProject)
+ headers := c.CommonHeaders(ctx)
+ headers.Set("X-Domain-Name", domain)
+
+ reqBody, err := json.Marshal(&pb.RegisterInstanceRequest{Instance: instance})
+ if err != nil {
+ return "", scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+
+ resp, err := c.RestDoWithContext(ctx, http.MethodPost,
+ fmt.Sprintf(apiInstancesURL, project, serviceId),
+ headers, reqBody)
+ if err != nil {
+ return "", scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+ defer resp.Body.Close()
+
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return "", scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ return "", c.toError(body)
+ }
+
+ instancesResp := &pb.RegisterInstanceResponse{}
+ err = json.Unmarshal(body, instancesResp)
+ if err != nil {
+ return "", scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+ return instancesResp.InstanceId, nil
+}
+
+func (c *SCClient) UnregisterInstance(ctx context.Context, domainProject, serviceId, instanceId string) *scerr.Error {
+ domain, project := core.FromDomainProject(domainProject)
+ headers := c.CommonHeaders(ctx)
+ headers.Set("X-Domain-Name", domain)
+
+ resp, err := c.RestDoWithContext(ctx, http.MethodDelete,
+ fmt.Sprintf(apiInstanceURL, project, serviceId, instanceId),
+ headers, nil)
+ if err != nil {
+ return scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+ defer resp.Body.Close()
+
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ return c.toError(body)
+ }
+
+ return nil
+}
+
+func (c *SCClient) DiscoveryInstances(ctx context.Context, domainProject, consumerId, providerAppId, providerServiceName, providerVersionRule string) ([]*pb.MicroServiceInstance, *scerr.Error) {
+ domain, project := core.FromDomainProject(domainProject)
+ headers := c.CommonHeaders(ctx)
+ headers.Set("X-Domain-Name", domain)
+ headers.Set("X-ConsumerId", consumerId)
+
+ query := url.Values{}
+ query.Set("appId", providerAppId)
+ query.Set("serviceName", providerServiceName)
+ query.Set("version", providerVersionRule)
+
+ resp, err := c.RestDoWithContext(ctx, http.MethodGet,
+ fmt.Sprintf(apiDiscoveryInstancesURL, project)+"?"+c.parseQuery(ctx)+"&"+query.Encode(),
+ headers, nil)
+ if err != nil {
+ return nil, scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+ defer resp.Body.Close()
+
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return nil, scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ return nil, c.toError(body)
+ }
+
+ instancesResp := &pb.GetInstancesResponse{}
+ err = json.Unmarshal(body, instancesResp)
+ if err != nil {
+ return nil, scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+
+ return instancesResp.Instances, nil
+}
+
+func (c *SCClient) Heartbeat(ctx context.Context, domainProject, serviceId, instanceId string) *scerr.Error {
+ domain, project := core.FromDomainProject(domainProject)
+ headers := c.CommonHeaders(ctx)
+ headers.Set("X-Domain-Name", domain)
+
+ resp, err := c.RestDoWithContext(ctx, http.MethodPut,
+ fmt.Sprintf(apiInstanceHeartbeatURL, project, serviceId, instanceId),
+ headers, nil)
+ if err != nil {
+ return scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+ defer resp.Body.Close()
+
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ return c.toError(body)
+ }
+
+ return nil
+}
+
+func (c *SCClient) HeartbeatSet(ctx context.Context, domainProject string, instances ...*pb.HeartbeatSetElement) ([]*pb.InstanceHbRst, *scerr.Error) {
+ domain, project := core.FromDomainProject(domainProject)
+ headers := c.CommonHeaders(ctx)
+ headers.Set("X-Domain-Name", domain)
+
+ reqBody, err := json.Marshal(&pb.HeartbeatSetRequest{Instances: instances})
+ if err != nil {
+ return nil, scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+
+ resp, err := c.RestDoWithContext(ctx, http.MethodPut,
+ fmt.Sprintf(apiHeartbeatSetURL, project),
+ headers, reqBody)
+ if err != nil {
+ return nil, scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+ defer resp.Body.Close()
+
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return nil, scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ return nil, c.toError(body)
+ }
+
+ instancesResp := &pb.HeartbeatSetResponse{}
+ err = json.Unmarshal(body, instancesResp)
+ if err != nil {
+ return nil, scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+
+ return instancesResp.Instances, nil
+}
diff --git a/pkg/client/sc/microservice.go b/pkg/client/sc/microservice.go
new file mode 100644
index 0000000..05a2db2
--- /dev/null
+++ b/pkg/client/sc/microservice.go
@@ -0,0 +1,127 @@
+package sc
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "net/url"
+
+ "github.com/apache/servicecomb-service-center/server/core"
+ pb "github.com/apache/servicecomb-service-center/server/core/proto"
+ scerr "github.com/apache/servicecomb-service-center/server/error"
+)
+
+const (
+ apiExistenceURL = "/v4/%s/registry/existence"
+ apiMicroServicesURL = "/v4/%s/registry/microservices"
+ apiMicroServiceURL = "/v4/%s/registry/microservices/%s"
+)
+
+func (c *SCClient) CreateService(ctx context.Context, domainProject string, service *pb.MicroService) (string, *scerr.Error) {
+ domain, project := core.FromDomainProject(domainProject)
+ headers := c.CommonHeaders(ctx)
+ headers.Set("X-Domain-Name", domain)
+
+ reqBody, err := json.Marshal(&pb.CreateServiceRequest{Service: service})
+ if err != nil {
+ return "", scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+
+ resp, err := c.RestDoWithContext(ctx, http.MethodPost,
+ fmt.Sprintf(apiMicroServicesURL, project),
+ headers, reqBody)
+ if err != nil {
+ return "", scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+ defer resp.Body.Close()
+
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return "", scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ return "", c.toError(body)
+ }
+
+ serviceResp := &pb.CreateServiceResponse{}
+ err = json.Unmarshal(body, serviceResp)
+ if err != nil {
+ return "", scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+ return serviceResp.ServiceId, nil
+}
+
+func (c *SCClient) DeleteService(ctx context.Context, domainProject, serviceId string) *scerr.Error {
+ domain, project := core.FromDomainProject(domainProject)
+ headers := c.CommonHeaders(ctx)
+ headers.Set("X-Domain-Name", domain)
+
+ resp, err := c.RestDoWithContext(ctx, http.MethodDelete,
+ fmt.Sprintf(apiMicroServiceURL, project, serviceId),
+ headers, nil)
+ if err != nil {
+ return scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+ defer resp.Body.Close()
+
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ return c.toError(body)
+ }
+
+ return nil
+}
+
+func (c *SCClient) ServiceExistence(ctx context.Context, domainProject string, appId, serviceName, versionRule, env string) (string, *scerr.Error) {
+ query := url.Values{}
+ query.Set("type", "microservice")
+ query.Set("env", env)
+ query.Set("appId", appId)
+ query.Set("serviceName", serviceName)
+ query.Set("version", versionRule)
+
+ resp, err := c.existence(ctx, domainProject, query)
+ if err != nil {
+ return "", err
+ }
+
+ return resp.ServiceId, nil
+}
+
+func (c *SCClient) existence(ctx context.Context, domainProject string, query url.Values) (*pb.GetExistenceResponse, *scerr.Error) {
+ domain, project := core.FromDomainProject(domainProject)
+ headers := c.CommonHeaders(ctx)
+ headers.Set("X-Domain-Name", domain)
+
+ resp, err := c.RestDoWithContext(ctx, http.MethodGet,
+ fmt.Sprintf(apiExistenceURL, project)+"?"+c.parseQuery(ctx)+"&"+query.Encode(),
+ headers, nil)
+ if err != nil {
+ return nil, scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+ defer resp.Body.Close()
+
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return nil, scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ return nil, c.toError(body)
+ }
+
+ existenceResp := &pb.GetExistenceResponse{}
+ err = json.Unmarshal(body, existenceResp)
+ if err != nil {
+ return nil, scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+
+ return existenceResp, nil
+}
diff --git a/pkg/client/sc/watch.go b/pkg/client/sc/watch.go
new file mode 100644
index 0000000..15af3e8
--- /dev/null
+++ b/pkg/client/sc/watch.go
@@ -0,0 +1,47 @@
+package sc
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "log"
+
+ "github.com/apache/servicecomb-service-center/server/core"
+ pb "github.com/apache/servicecomb-service-center/server/core/proto"
+ scerr "github.com/apache/servicecomb-service-center/server/error"
+ "github.com/gorilla/websocket"
+)
+
+const (
+ apiWatcherURL = "/v4/%s/registry/microservices/%s/watcher"
+ apiListAndWatcherURL = "/v4/%s/registry/microservices/%s/listwatcher"
+)
+
+func (c *SCClient) Watch(ctx context.Context, domainProject, selfServiceId string, callback func(*pb.WatchInstanceResponse)) *scerr.Error {
+ domain, project := core.FromDomainProject(domainProject)
+ headers := c.CommonHeaders(ctx)
+ headers.Set("X-Domain-Name", domain)
+
+ conn, err := c.WebsocketDial(ctx, fmt.Sprintf(apiWatcherURL, project, selfServiceId), headers)
+ if err != nil {
+ return scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+
+ for {
+ messageType, message, err := conn.ReadMessage()
+ if err != nil {
+ log.Println(err)
+ break
+ }
+ if messageType == websocket.TextMessage {
+ data := &pb.WatchInstanceResponse{}
+ err := json.Unmarshal(message, data)
+ if err != nil {
+ log.Println(err)
+ break
+ }
+ callback(data)
+ }
+ }
+ return scerr.NewError(scerr.ErrInternal, err.Error())
+}
diff --git a/pkg/client/sc/websocket.go b/pkg/client/sc/websocket.go
new file mode 100644
index 0000000..2b7f68e
--- /dev/null
+++ b/pkg/client/sc/websocket.go
@@ -0,0 +1,39 @@
+package sc
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ "net/http"
+ "net/url"
+
+ "github.com/gorilla/websocket"
+)
+
+func (c *LBClient) WebsocketDial(ctx context.Context, api string, headers http.Header) (conn *websocket.Conn, err error) {
+ dialer := &websocket.Dialer{TLSClientConfig: c.TLS}
+ var errs []string
+ for i := 0; i < c.Retries; i++ {
+ var addr *url.URL
+ addr, err = url.Parse(c.Next())
+ if err != nil {
+ errs = append(errs, fmt.Sprintf("[%s]: %s", addr, err.Error()))
+ continue
+ }
+ if addr.Scheme == "https" {
+ addr.Scheme = "wss"
+ } else {
+ addr.Scheme = "ws"
+ }
+ conn, _, err = dialer.Dial(addr.String() + api, headers)
+ if err == nil {
+ break
+ }
+ errs = append(errs, fmt.Sprintf("[%s]: %s", addr, err.Error()))
+ }
+ if err != nil {
+ err = errors.New(util.StringJoin(errs, ", "))
+ }
+ return
+}