You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/06/22 02:14:27 UTC
[dubbo-go] branch 3.0 updated: feat: Grpc based Health check. (#1935)
This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 48fad4bd9 feat: Grpc based Health check. (#1935)
48fad4bd9 is described below
commit 48fad4bd98479cb24d421b5b1801685420de5b70
Author: Stonex <43...@users.noreply.github.com>
AuthorDate: Wed Jun 22 10:14:19 2022 +0800
feat: Grpc based Health check. (#1935)
* feat: Add health check service. Health check service starts with dubbo.
* Use grpc health check proto, ref: https://github.com/grpc-ecosystem/grpc-health-probe
* Fix test config
* Current health check only support triple protocol.
* chore: Change inner serivice name.
---
common/constant/key.go | 8 +-
config/provider_config.go | 18 +-
config/provider_config_test.go | 3 +-
imports/imports.go | 1 +
protocol/dubbo3/health/serverhealth.go | 185 ++++++++++++
.../dubbo3/health/triple_health_v1/health.pb.go | 314 +++++++++++++++++++++
.../dubbo3/health/triple_health_v1/health.proto | 62 ++++
.../health/triple_health_v1/health_triple.pb.go | 286 +++++++++++++++++++
protocol/dubbo3/reflection/serverreflection.go | 30 +-
registry/protocol/protocol.go | 4 +
10 files changed, 892 insertions(+), 19 deletions(-)
diff --git a/common/constant/key.go b/common/constant/key.go
index 51ca45375..4fd53cdb7 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -371,6 +371,12 @@ const (
// reflection service
const (
- ReflectionServiceTypeName = "XXX_serverReflectionServer"
+ ReflectionServiceTypeName = "DubbogoServerReflectionServer"
ReflectionServiceInterface = "grpc.reflection.v1alpha.ServerReflection"
)
+
+// healthcheck service
+const (
+ HealthCheckServiceTypeName = "DubbogoHealthServer"
+ HealthCheckServiceInterface = "grpc.health.v1.Health"
+)
diff --git a/config/provider_config.go b/config/provider_config.go
index 0c024947c..51d78e5d6 100644
--- a/config/provider_config.go
+++ b/config/provider_config.go
@@ -111,6 +111,17 @@ func (c *ProviderConfig) Init(rc *RootConfig) error {
for k, v := range rc.Protocols {
if v.Name == tripleConstant.TRIPLE {
+ // Auto create grpc based health check service.
+ healthService := NewServiceConfigBuilder().
+ SetProtocolIDs(k).
+ SetNotRegister(true).
+ SetInterface(constant.HealthCheckServiceInterface).
+ Build()
+ if err := healthService.Init(rc); err != nil {
+ return err
+ }
+ c.Services[constant.HealthCheckServiceTypeName] = healthService
+
// Auto create reflection service configure only when provider with triple service is configured.
tripleReflectionService := NewServiceConfigBuilder().
SetProtocolIDs(k).
@@ -120,7 +131,9 @@ func (c *ProviderConfig) Init(rc *RootConfig) error {
if err := tripleReflectionService.Init(rc); err != nil {
return err
}
+ // Maybe only register once, If setting this service, break from traversing Protocols.
c.Services[constant.ReflectionServiceTypeName] = tripleReflectionService
+ break
}
}
@@ -144,8 +157,9 @@ func (c *ProviderConfig) Load() {
for registeredTypeName, service := range GetProviderServiceMap() {
serviceConfig, ok := c.Services[registeredTypeName]
if !ok {
- if registeredTypeName == constant.ReflectionServiceTypeName {
- // do not auto generate reflection server's configuration.
+ if registeredTypeName == constant.ReflectionServiceTypeName ||
+ registeredTypeName == constant.HealthCheckServiceTypeName {
+ // do not auto generate reflection or health check server's configuration.
continue
}
// service doesn't config in config file, create one with default
diff --git a/config/provider_config_test.go b/config/provider_config_test.go
index 1369b617e..8f954941b 100644
--- a/config/provider_config_test.go
+++ b/config/provider_config_test.go
@@ -42,7 +42,8 @@ func TestProviderConfigRootRegistry(t *testing.T) {
assert.Nil(t, err)
provider := rootConfig.Provider
assert.NotNil(t, provider)
- assert.Equal(t, 2, len(provider.Services))
+ assert.NotNil(t, provider.Services["HelloService"])
+ assert.NotNil(t, provider.Services["OrderService"])
assert.Equal(t, 2, len(provider.Services["HelloService"].RegistryIDs))
assert.Equal(t, 1, len(provider.Services["OrderService"].RegistryIDs))
diff --git a/imports/imports.go b/imports/imports.go
index 9e26015fa..5ef6cb739 100644
--- a/imports/imports.go
+++ b/imports/imports.go
@@ -64,6 +64,7 @@ import (
_ "dubbo.apache.org/dubbo-go/v3/metrics/prometheus"
_ "dubbo.apache.org/dubbo-go/v3/protocol/dubbo"
_ "dubbo.apache.org/dubbo-go/v3/protocol/dubbo3"
+ _ "dubbo.apache.org/dubbo-go/v3/protocol/dubbo3/health"
_ "dubbo.apache.org/dubbo-go/v3/protocol/dubbo3/reflection"
_ "dubbo.apache.org/dubbo-go/v3/protocol/grpc"
_ "dubbo.apache.org/dubbo-go/v3/protocol/jsonrpc"
diff --git a/protocol/dubbo3/health/serverhealth.go b/protocol/dubbo3/health/serverhealth.go
new file mode 100644
index 000000000..2d4a9097f
--- /dev/null
+++ b/protocol/dubbo3/health/serverhealth.go
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package health provides a service that exposes server's health and it must be
+// imported to enable support for client-side health checks.
+package health
+
+import (
+ "context"
+ "sync"
+)
+
+import (
+ "github.com/dubbogo/grpc-go/codes"
+ "github.com/dubbogo/grpc-go/status"
+)
+
+import (
+ logger "dubbo.apache.org/dubbo-go/v3/common/logger"
+ "dubbo.apache.org/dubbo-go/v3/config"
+ healthpb "dubbo.apache.org/dubbo-go/v3/protocol/dubbo3/health/triple_health_v1"
+ healthtriple "dubbo.apache.org/dubbo-go/v3/protocol/dubbo3/health/triple_health_v1"
+)
+
+// Server implements `service Health`.
+type DubbogoHealthServer struct {
+ healthtriple.UnimplementedHealthServer
+ mu sync.RWMutex
+ // If shutdown is true, it's expected all serving status is NOT_SERVING, and
+ // will stay in NOT_SERVING.
+ shutdown bool
+ // statusMap stores the serving status of the services this Server monitors.
+ statusMap map[string]healthpb.HealthCheckResponse_ServingStatus
+ updates map[string]map[healthtriple.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus
+}
+
+var healthServer *DubbogoHealthServer
+
+// NewServer returns a new Server.
+func NewServer() *DubbogoHealthServer {
+ return &DubbogoHealthServer{
+ statusMap: map[string]healthpb.HealthCheckResponse_ServingStatus{"": healthpb.HealthCheckResponse_SERVING},
+ updates: make(map[string]map[healthtriple.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus),
+ }
+}
+
+// Check implements `service Health`.
+func (s *DubbogoHealthServer) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+ if servingStatus, ok := s.statusMap[in.Service]; ok {
+ return &healthpb.HealthCheckResponse{
+ Status: servingStatus,
+ }, nil
+ }
+ return nil, status.Error(codes.NotFound, "unknown service")
+}
+
+// Watch implements `service Health`.
+func (s *DubbogoHealthServer) Watch(in *healthpb.HealthCheckRequest, stream healthtriple.Health_WatchServer) error {
+ service := in.Service
+ // update channel is used for getting service status updates.
+ update := make(chan healthpb.HealthCheckResponse_ServingStatus, 1)
+ s.mu.Lock()
+ // Puts the initial status to the channel.
+ if servingStatus, ok := s.statusMap[service]; ok {
+ update <- servingStatus
+ } else {
+ update <- healthpb.HealthCheckResponse_SERVICE_UNKNOWN
+ }
+
+ // Registers the update channel to the correct place in the updates map.
+ if _, ok := s.updates[service]; !ok {
+ s.updates[service] = make(map[healthtriple.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus)
+ }
+ s.updates[service][stream] = update
+ defer func() {
+ s.mu.Lock()
+ delete(s.updates[service], stream)
+ s.mu.Unlock()
+ }()
+ s.mu.Unlock()
+
+ var lastSentStatus healthpb.HealthCheckResponse_ServingStatus = -1
+ for {
+ select {
+ // Status updated. Sends the up-to-date status to the client.
+ case servingStatus := <-update:
+ if lastSentStatus == servingStatus {
+ continue
+ }
+ lastSentStatus = servingStatus
+ err := stream.Send(&healthpb.HealthCheckResponse{Status: servingStatus})
+ if err != nil {
+ return status.Error(codes.Canceled, "Stream has ended.")
+ }
+ // Context done. Removes the update channel from the updates map.
+ case <-stream.Context().Done():
+ return status.Error(codes.Canceled, "Stream has ended.")
+ }
+ }
+}
+
+// SetServingStatus is called when need to reset the serving status of a service
+// or insert a new service entry into the statusMap.
+func (s *DubbogoHealthServer) SetServingStatus(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.shutdown {
+ logger.Infof("health: status changing for %s to %v is ignored because health service is shutdown", service, servingStatus)
+ return
+ }
+
+ s.setServingStatusLocked(service, servingStatus)
+}
+
+func (s *DubbogoHealthServer) setServingStatusLocked(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) {
+ s.statusMap[service] = servingStatus
+ for _, update := range s.updates[service] {
+ // Clears previous updates, that are not sent to the client, from the channel.
+ // This can happen if the client is not reading and the server gets flow control limited.
+ select {
+ case <-update:
+ default:
+ }
+ // Puts the most recent update to the channel.
+ update <- servingStatus
+ }
+}
+
+// Shutdown sets all serving status to NOT_SERVING, and configures the server to
+// ignore all future status changes.
+//
+// This changes serving status for all services. To set status for a particular
+// services, call SetServingStatus().
+func (s *DubbogoHealthServer) Shutdown() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.shutdown = true
+ for service := range s.statusMap {
+ s.setServingStatusLocked(service, healthpb.HealthCheckResponse_NOT_SERVING)
+ }
+}
+
+// Resume sets all serving status to SERVING, and configures the server to
+// accept all future status changes.
+//
+// This changes serving status for all services. To set status for a particular
+// services, call SetServingStatus().
+func (s *DubbogoHealthServer) Resume() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.shutdown = false
+ for service := range s.statusMap {
+ s.setServingStatusLocked(service, healthpb.HealthCheckResponse_SERVING)
+ }
+}
+
+// Set health check interface.
+func init() {
+ healthServer = NewServer()
+ config.SetProviderService(healthServer)
+}
+
+func SetServingStatusServing(service string) {
+ healthServer.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING)
+}
+
+func SetServingStatusNotServing(service string) {
+ healthServer.SetServingStatus(service, healthpb.HealthCheckResponse_NOT_SERVING)
+}
diff --git a/protocol/dubbo3/health/triple_health_v1/health.pb.go b/protocol/dubbo3/health/triple_health_v1/health.pb.go
new file mode 100644
index 000000000..e8bc5f970
--- /dev/null
+++ b/protocol/dubbo3/health/triple_health_v1/health.pb.go
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// The canonical version of this proto can be found at
+// https://github.com/grpc/grpc-proto/blob/master/grpc/health/v1/health.proto
+
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// protoc-gen-go v1.28.0
+// protoc v3.20.1
+// source: protocol/dubbo3/health/triple_health_v1/health.proto
+
+package triple_health_v1
+
+import (
+ reflect "reflect"
+ sync "sync"
+)
+
+import (
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+)
+
+const (
+ // Verify that this generated code is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+ // Verify that runtime/protoimpl is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+type HealthCheckResponse_ServingStatus int32
+
+const (
+ HealthCheckResponse_UNKNOWN HealthCheckResponse_ServingStatus = 0
+ HealthCheckResponse_SERVING HealthCheckResponse_ServingStatus = 1
+ HealthCheckResponse_NOT_SERVING HealthCheckResponse_ServingStatus = 2
+ HealthCheckResponse_SERVICE_UNKNOWN HealthCheckResponse_ServingStatus = 3 // Used only by the Watch method.
+)
+
+// Enum value maps for HealthCheckResponse_ServingStatus.
+var (
+ HealthCheckResponse_ServingStatus_name = map[int32]string{
+ 0: "UNKNOWN",
+ 1: "SERVING",
+ 2: "NOT_SERVING",
+ 3: "SERVICE_UNKNOWN",
+ }
+ HealthCheckResponse_ServingStatus_value = map[string]int32{
+ "UNKNOWN": 0,
+ "SERVING": 1,
+ "NOT_SERVING": 2,
+ "SERVICE_UNKNOWN": 3,
+ }
+)
+
+func (x HealthCheckResponse_ServingStatus) Enum() *HealthCheckResponse_ServingStatus {
+ p := new(HealthCheckResponse_ServingStatus)
+ *p = x
+ return p
+}
+
+func (x HealthCheckResponse_ServingStatus) String() string {
+ return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
+}
+
+func (HealthCheckResponse_ServingStatus) Descriptor() protoreflect.EnumDescriptor {
+ return file_protocol_dubbo3_health_triple_health_v1_health_proto_enumTypes[0].Descriptor()
+}
+
+func (HealthCheckResponse_ServingStatus) Type() protoreflect.EnumType {
+ return &file_protocol_dubbo3_health_triple_health_v1_health_proto_enumTypes[0]
+}
+
+func (x HealthCheckResponse_ServingStatus) Number() protoreflect.EnumNumber {
+ return protoreflect.EnumNumber(x)
+}
+
+// Deprecated: Use HealthCheckResponse_ServingStatus.Descriptor instead.
+func (HealthCheckResponse_ServingStatus) EnumDescriptor() ([]byte, []int) {
+ return file_protocol_dubbo3_health_triple_health_v1_health_proto_rawDescGZIP(), []int{1, 0}
+}
+
+type HealthCheckRequest struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"`
+}
+
+func (x *HealthCheckRequest) Reset() {
+ *x = HealthCheckRequest{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_protocol_dubbo3_health_triple_health_v1_health_proto_msgTypes[0]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *HealthCheckRequest) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*HealthCheckRequest) ProtoMessage() {}
+
+func (x *HealthCheckRequest) ProtoReflect() protoreflect.Message {
+ mi := &file_protocol_dubbo3_health_triple_health_v1_health_proto_msgTypes[0]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use HealthCheckRequest.ProtoReflect.Descriptor instead.
+func (*HealthCheckRequest) Descriptor() ([]byte, []int) {
+ return file_protocol_dubbo3_health_triple_health_v1_health_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *HealthCheckRequest) GetService() string {
+ if x != nil {
+ return x.Service
+ }
+ return ""
+}
+
+type HealthCheckResponse struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Status HealthCheckResponse_ServingStatus `protobuf:"varint,1,opt,name=status,proto3,enum=grpc.health.v1.HealthCheckResponse_ServingStatus" json:"status,omitempty"`
+}
+
+func (x *HealthCheckResponse) Reset() {
+ *x = HealthCheckResponse{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_protocol_dubbo3_health_triple_health_v1_health_proto_msgTypes[1]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *HealthCheckResponse) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*HealthCheckResponse) ProtoMessage() {}
+
+func (x *HealthCheckResponse) ProtoReflect() protoreflect.Message {
+ mi := &file_protocol_dubbo3_health_triple_health_v1_health_proto_msgTypes[1]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use HealthCheckResponse.ProtoReflect.Descriptor instead.
+func (*HealthCheckResponse) Descriptor() ([]byte, []int) {
+ return file_protocol_dubbo3_health_triple_health_v1_health_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *HealthCheckResponse) GetStatus() HealthCheckResponse_ServingStatus {
+ if x != nil {
+ return x.Status
+ }
+ return HealthCheckResponse_UNKNOWN
+}
+
+var File_protocol_dubbo3_health_triple_health_v1_health_proto protoreflect.FileDescriptor
+
+var file_protocol_dubbo3_health_triple_health_v1_health_proto_rawDesc = []byte{
+ 0x0a, 0x34, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2f, 0x64, 0x75, 0x62, 0x62, 0x6f,
+ 0x33, 0x2f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2f, 0x74, 0x72, 0x69, 0x70, 0x6c, 0x65, 0x5f,
+ 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x5f, 0x76, 0x31, 0x2f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68,
+ 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61,
+ 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x22, 0x2e, 0x0a, 0x12, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68,
+ 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07,
+ 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x73,
+ 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x22, 0xb1, 0x01, 0x0a, 0x13, 0x48, 0x65, 0x61, 0x6c, 0x74,
+ 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x49,
+ 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x31,
+ 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e,
+ 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f,
+ 0x6e, 0x73, 0x65, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x6e, 0x67, 0x53, 0x74, 0x61, 0x74, 0x75,
+ 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x4f, 0x0a, 0x0d, 0x53, 0x65, 0x72,
+ 0x76, 0x69, 0x6e, 0x67, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e,
+ 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x45, 0x52, 0x56, 0x49,
+ 0x4e, 0x47, 0x10, 0x01, 0x12, 0x0f, 0x0a, 0x0b, 0x4e, 0x4f, 0x54, 0x5f, 0x53, 0x45, 0x52, 0x56,
+ 0x49, 0x4e, 0x47, 0x10, 0x02, 0x12, 0x13, 0x0a, 0x0f, 0x53, 0x45, 0x52, 0x56, 0x49, 0x43, 0x45,
+ 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x03, 0x32, 0xae, 0x01, 0x0a, 0x06, 0x48,
+ 0x65, 0x61, 0x6c, 0x74, 0x68, 0x12, 0x50, 0x0a, 0x05, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x12, 0x22,
+ 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e,
+ 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65,
+ 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68,
+ 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52,
+ 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x52, 0x0a, 0x05, 0x57, 0x61, 0x74, 0x63, 0x68,
+ 0x12, 0x22, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76,
+ 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71,
+ 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, 0x6c,
+ 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63,
+ 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x34, 0x5a, 0x32, 0x67,
+ 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x64, 0x75, 0x62, 0x62, 0x6f, 0x67,
+ 0x6f, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2d, 0x67, 0x6f, 0x2f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68,
+ 0x2f, 0x74, 0x72, 0x69, 0x70, 0x6c, 0x65, 0x5f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x5f, 0x76,
+ 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+}
+
+var (
+ file_protocol_dubbo3_health_triple_health_v1_health_proto_rawDescOnce sync.Once
+ file_protocol_dubbo3_health_triple_health_v1_health_proto_rawDescData = file_protocol_dubbo3_health_triple_health_v1_health_proto_rawDesc
+)
+
+func file_protocol_dubbo3_health_triple_health_v1_health_proto_rawDescGZIP() []byte {
+ file_protocol_dubbo3_health_triple_health_v1_health_proto_rawDescOnce.Do(func() {
+ file_protocol_dubbo3_health_triple_health_v1_health_proto_rawDescData = protoimpl.X.CompressGZIP(file_protocol_dubbo3_health_triple_health_v1_health_proto_rawDescData)
+ })
+ return file_protocol_dubbo3_health_triple_health_v1_health_proto_rawDescData
+}
+
+var file_protocol_dubbo3_health_triple_health_v1_health_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
+var file_protocol_dubbo3_health_triple_health_v1_health_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
+var file_protocol_dubbo3_health_triple_health_v1_health_proto_goTypes = []interface{}{
+ (HealthCheckResponse_ServingStatus)(0), // 0: grpc.health.v1.HealthCheckResponse.ServingStatus
+ (*HealthCheckRequest)(nil), // 1: grpc.health.v1.HealthCheckRequest
+ (*HealthCheckResponse)(nil), // 2: grpc.health.v1.HealthCheckResponse
+}
+var file_protocol_dubbo3_health_triple_health_v1_health_proto_depIdxs = []int32{
+ 0, // 0: grpc.health.v1.HealthCheckResponse.status:type_name -> grpc.health.v1.HealthCheckResponse.ServingStatus
+ 1, // 1: grpc.health.v1.Health.Check:input_type -> grpc.health.v1.HealthCheckRequest
+ 1, // 2: grpc.health.v1.Health.Watch:input_type -> grpc.health.v1.HealthCheckRequest
+ 2, // 3: grpc.health.v1.Health.Check:output_type -> grpc.health.v1.HealthCheckResponse
+ 2, // 4: grpc.health.v1.Health.Watch:output_type -> grpc.health.v1.HealthCheckResponse
+ 3, // [3:5] is the sub-list for method output_type
+ 1, // [1:3] is the sub-list for method input_type
+ 1, // [1:1] is the sub-list for extension type_name
+ 1, // [1:1] is the sub-list for extension extendee
+ 0, // [0:1] is the sub-list for field type_name
+}
+
+func init() { file_protocol_dubbo3_health_triple_health_v1_health_proto_init() }
+func file_protocol_dubbo3_health_triple_health_v1_health_proto_init() {
+ if File_protocol_dubbo3_health_triple_health_v1_health_proto != nil {
+ return
+ }
+ if !protoimpl.UnsafeEnabled {
+ file_protocol_dubbo3_health_triple_health_v1_health_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*HealthCheckRequest); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_protocol_dubbo3_health_triple_health_v1_health_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*HealthCheckResponse); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ }
+ type x struct{}
+ out := protoimpl.TypeBuilder{
+ File: protoimpl.DescBuilder{
+ GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+ RawDescriptor: file_protocol_dubbo3_health_triple_health_v1_health_proto_rawDesc,
+ NumEnums: 1,
+ NumMessages: 2,
+ NumExtensions: 0,
+ NumServices: 1,
+ },
+ GoTypes: file_protocol_dubbo3_health_triple_health_v1_health_proto_goTypes,
+ DependencyIndexes: file_protocol_dubbo3_health_triple_health_v1_health_proto_depIdxs,
+ EnumInfos: file_protocol_dubbo3_health_triple_health_v1_health_proto_enumTypes,
+ MessageInfos: file_protocol_dubbo3_health_triple_health_v1_health_proto_msgTypes,
+ }.Build()
+ File_protocol_dubbo3_health_triple_health_v1_health_proto = out.File
+ file_protocol_dubbo3_health_triple_health_v1_health_proto_rawDesc = nil
+ file_protocol_dubbo3_health_triple_health_v1_health_proto_goTypes = nil
+ file_protocol_dubbo3_health_triple_health_v1_health_proto_depIdxs = nil
+}
diff --git a/protocol/dubbo3/health/triple_health_v1/health.proto b/protocol/dubbo3/health/triple_health_v1/health.proto
new file mode 100644
index 000000000..ae5534838
--- /dev/null
+++ b/protocol/dubbo3/health/triple_health_v1/health.proto
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// The canonical version of this proto can be found at
+// https://github.com/grpc/grpc-proto/blob/master/grpc/health/v1/health.proto
+
+syntax = "proto3";
+
+package grpc.health.v1;
+
+option go_package = "github.com/dubbogo/grpc-go/health/triple_health_v1";
+
+message HealthCheckRequest {
+ string service = 1;
+}
+
+message HealthCheckResponse {
+ enum ServingStatus {
+ UNKNOWN = 0;
+ SERVING = 1;
+ NOT_SERVING = 2;
+ SERVICE_UNKNOWN = 3; // Used only by the Watch method.
+ }
+ ServingStatus status = 1;
+}
+
+service Health {
+ // If the requested service is unknown, the call will fail with status
+ // NOT_FOUND.
+ rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
+
+ // Performs a watch for the serving status of the requested service.
+ // The server will immediately send back a message indicating the current
+ // serving status. It will then subsequently send a new message whenever
+ // the service's serving status changes.
+ //
+ // If the requested service is unknown when the call is received, the
+ // server will send a message setting the serving status to
+ // SERVICE_UNKNOWN but will *not* terminate the call. If at some
+ // future point, the serving status of the service becomes known, the
+ // server will send a new message with the service's serving status.
+ //
+ // If the call terminates with status UNIMPLEMENTED, then clients
+ // should assume this method is not supported and should not retry the
+ // call. If the call terminates with any other status (including OK),
+ // clients should retry the call with appropriate exponential backoff.
+ rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
+}
\ No newline at end of file
diff --git a/protocol/dubbo3/health/triple_health_v1/health_triple.pb.go b/protocol/dubbo3/health/triple_health_v1/health_triple.pb.go
new file mode 100644
index 000000000..a63d394d5
--- /dev/null
+++ b/protocol/dubbo3/health/triple_health_v1/health_triple.pb.go
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Code generated by protoc-gen-go-triple. DO NOT EDIT.
+// versions:
+// - protoc-gen-go-triple v1.0.8
+// - protoc v3.20.1
+// source: protocol/dubbo3/health/triple_health_v1/health.proto
+
+package triple_health_v1
+
+import (
+ context "context"
+ fmt "fmt"
+)
+
+import (
+ grpc_go "github.com/dubbogo/grpc-go"
+ codes "github.com/dubbogo/grpc-go/codes"
+ metadata "github.com/dubbogo/grpc-go/metadata"
+ status "github.com/dubbogo/grpc-go/status"
+
+ common "github.com/dubbogo/triple/pkg/common"
+ constant "github.com/dubbogo/triple/pkg/common/constant"
+ triple "github.com/dubbogo/triple/pkg/triple"
+)
+
+import (
+ constant1 "dubbo.apache.org/dubbo-go/v3/common/constant"
+ protocol "dubbo.apache.org/dubbo-go/v3/protocol"
+ dubbo3 "dubbo.apache.org/dubbo-go/v3/protocol/dubbo3"
+ invocation "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
+)
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+const _ = grpc_go.SupportPackageIsVersion7
+
+// HealthClient is the client API for Health service.
+//
+// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
+type HealthClient interface {
+ // If the requested service is unknown, the call will fail with status
+ // NOT_FOUND.
+ Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc_go.CallOption) (*HealthCheckResponse, common.ErrorWithAttachment)
+ // Performs a watch for the serving status of the requested service.
+ // The server will immediately send back a message indicating the current
+ // serving status. It will then subsequently send a new message whenever
+ // the service's serving status changes.
+ //
+ // If the requested service is unknown when the call is received, the
+ // server will send a message setting the serving status to
+ // SERVICE_UNKNOWN but will *not* terminate the call. If at some
+ // future point, the serving status of the service becomes known, the
+ // server will send a new message with the service's serving status.
+ //
+ // If the call terminates with status UNIMPLEMENTED, then clients
+ // should assume this method is not supported and should not retry the
+ // call. If the call terminates with any other status (including OK),
+ // clients should retry the call with appropriate exponential backoff.
+ Watch(ctx context.Context, in *HealthCheckRequest, opts ...grpc_go.CallOption) (Health_WatchClient, error)
+}
+
+type healthClient struct {
+ cc *triple.TripleConn
+}
+
+type HealthClientImpl struct {
+ Check func(ctx context.Context, in *HealthCheckRequest) (*HealthCheckResponse, error)
+ Watch func(ctx context.Context, in *HealthCheckRequest) (Health_WatchClient, error)
+}
+
+func (c *HealthClientImpl) GetDubboStub(cc *triple.TripleConn) HealthClient {
+ return NewHealthClient(cc)
+}
+
+func (c *HealthClientImpl) XXX_InterfaceName() string {
+ return "grpc.health.v1.Health"
+}
+
+func NewHealthClient(cc *triple.TripleConn) HealthClient {
+ return &healthClient{cc}
+}
+
+func (c *healthClient) Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc_go.CallOption) (*HealthCheckResponse, common.ErrorWithAttachment) {
+ out := new(HealthCheckResponse)
+ interfaceKey := ctx.Value(constant.InterfaceKey).(string)
+ return out, c.cc.Invoke(ctx, "/"+interfaceKey+"/Check", in, out)
+}
+
+func (c *healthClient) Watch(ctx context.Context, in *HealthCheckRequest, opts ...grpc_go.CallOption) (Health_WatchClient, error) {
+ interfaceKey := ctx.Value(constant.InterfaceKey).(string)
+ stream, err := c.cc.NewStream(ctx, "/"+interfaceKey+"/Watch", opts...)
+ if err != nil {
+ return nil, err
+ }
+ x := &healthWatchClient{stream}
+ if err := x.ClientStream.SendMsg(in); err != nil {
+ return nil, err
+ }
+ if err := x.ClientStream.CloseSend(); err != nil {
+ return nil, err
+ }
+ return x, nil
+}
+
+type Health_WatchClient interface {
+ Recv() (*HealthCheckResponse, error)
+ grpc_go.ClientStream
+}
+
+type healthWatchClient struct {
+ grpc_go.ClientStream
+}
+
+func (x *healthWatchClient) Recv() (*HealthCheckResponse, error) {
+ m := new(HealthCheckResponse)
+ if err := x.ClientStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+// HealthServer is the server API for Health service.
+// All implementations must embed UnimplementedHealthServer
+// for forward compatibility
+type HealthServer interface {
+ // If the requested service is unknown, the call will fail with status
+ // NOT_FOUND.
+ Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error)
+ // Performs a watch for the serving status of the requested service.
+ // The server will immediately send back a message indicating the current
+ // serving status. It will then subsequently send a new message whenever
+ // the service's serving status changes.
+ //
+ // If the requested service is unknown when the call is received, the
+ // server will send a message setting the serving status to
+ // SERVICE_UNKNOWN but will *not* terminate the call. If at some
+ // future point, the serving status of the service becomes known, the
+ // server will send a new message with the service's serving status.
+ //
+ // If the call terminates with status UNIMPLEMENTED, then clients
+ // should assume this method is not supported and should not retry the
+ // call. If the call terminates with any other status (including OK),
+ // clients should retry the call with appropriate exponential backoff.
+ Watch(*HealthCheckRequest, Health_WatchServer) error
+ mustEmbedUnimplementedHealthServer()
+}
+
+// UnimplementedHealthServer must be embedded to have forward compatible implementations.
+type UnimplementedHealthServer struct {
+ proxyImpl protocol.Invoker
+}
+
+func (UnimplementedHealthServer) Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) {
+ return nil, status.Errorf(codes.Unimplemented, "method Check not implemented")
+}
+func (UnimplementedHealthServer) Watch(*HealthCheckRequest, Health_WatchServer) error {
+ return status.Errorf(codes.Unimplemented, "method Watch not implemented")
+}
+func (s *UnimplementedHealthServer) XXX_SetProxyImpl(impl protocol.Invoker) {
+ s.proxyImpl = impl
+}
+
+func (s *UnimplementedHealthServer) XXX_GetProxyImpl() protocol.Invoker {
+ return s.proxyImpl
+}
+
+func (s *UnimplementedHealthServer) XXX_ServiceDesc() *grpc_go.ServiceDesc {
+ return &Health_ServiceDesc
+}
+func (s *UnimplementedHealthServer) XXX_InterfaceName() string {
+ return "grpc.health.v1.Health"
+}
+
+func (UnimplementedHealthServer) mustEmbedUnimplementedHealthServer() {}
+
+// UnsafeHealthServer may be embedded to opt out of forward compatibility for this service.
+// Use of this interface is not recommended, as added methods to HealthServer will
+// result in compilation errors.
+type UnsafeHealthServer interface {
+ mustEmbedUnimplementedHealthServer()
+}
+
+func RegisterHealthServer(s grpc_go.ServiceRegistrar, srv HealthServer) {
+ s.RegisterService(&Health_ServiceDesc, srv)
+}
+
+func _Health_Check_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc_go.UnaryServerInterceptor) (interface{}, error) {
+ in := new(HealthCheckRequest)
+ if err := dec(in); err != nil {
+ return nil, err
+ }
+ base := srv.(dubbo3.Dubbo3GrpcService)
+ args := []interface{}{}
+ args = append(args, in)
+ md, _ := metadata.FromIncomingContext(ctx)
+ invAttachment := make(map[string]interface{}, len(md))
+ for k, v := range md {
+ invAttachment[k] = v
+ }
+ invo := invocation.NewRPCInvocation("Check", args, invAttachment)
+ if interceptor == nil {
+ result := base.XXX_GetProxyImpl().Invoke(ctx, invo)
+ return result, result.Error()
+ }
+ info := &grpc_go.UnaryServerInfo{
+ Server: srv,
+ FullMethod: ctx.Value("XXX_TRIPLE_GO_INTERFACE_NAME").(string),
+ }
+ handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+ result := base.XXX_GetProxyImpl().Invoke(ctx, invo)
+ return result, result.Error()
+ }
+ return interceptor(ctx, in, info, handler)
+}
+
+func _Health_Watch_Handler(srv interface{}, stream grpc_go.ServerStream) error {
+ _, ok := srv.(dubbo3.Dubbo3GrpcService)
+ ctx := stream.Context()
+ md, _ := metadata.FromIncomingContext(ctx)
+ invAttachment := make(map[string]interface{}, len(md))
+ for k, v := range md {
+ invAttachment[k] = v
+ }
+ stream.(grpc_go.CtxSetterStream).SetContext(context.WithValue(ctx, constant1.AttachmentKey, invAttachment))
+ invo := invocation.NewRPCInvocation("Watch", nil, nil)
+ if !ok {
+ fmt.Println(invo)
+ return nil
+ }
+ m := new(HealthCheckRequest)
+ if err := stream.RecvMsg(m); err != nil {
+ return err
+ }
+ return srv.(HealthServer).Watch(m, &healthWatchServer{stream})
+}
+
+type Health_WatchServer interface {
+ Send(*HealthCheckResponse) error
+ grpc_go.ServerStream
+}
+
+type healthWatchServer struct {
+ grpc_go.ServerStream
+}
+
+func (x *healthWatchServer) Send(m *HealthCheckResponse) error {
+ return x.ServerStream.SendMsg(m)
+}
+
+// Health_ServiceDesc is the grpc_go.ServiceDesc for Health service.
+// It's only intended for direct use with grpc_go.RegisterService,
+// and not to be introspected or modified (even as a copy)
+var Health_ServiceDesc = grpc_go.ServiceDesc{
+ ServiceName: "grpc.health.v1.Health",
+ HandlerType: (*HealthServer)(nil),
+ Methods: []grpc_go.MethodDesc{
+ {
+ MethodName: "Check",
+ Handler: _Health_Check_Handler,
+ },
+ },
+ Streams: []grpc_go.StreamDesc{
+ {
+ StreamName: "Watch",
+ Handler: _Health_Watch_Handler,
+ ServerStreams: true,
+ },
+ },
+ Metadata: "protocol/dubbo3/health/triple_health_v1/health.proto",
+}
diff --git a/protocol/dubbo3/reflection/serverreflection.go b/protocol/dubbo3/reflection/serverreflection.go
index 74c7a92a3..caf19c38a 100644
--- a/protocol/dubbo3/reflection/serverreflection.go
+++ b/protocol/dubbo3/reflection/serverreflection.go
@@ -60,7 +60,7 @@ type GRPCServer interface {
var _ GRPCServer = (*grpc.Server)(nil)
-type XXX_serverReflectionServer struct {
+type DubbogoServerReflectionServer struct {
rpb.UnimplementedServerReflectionServer
s GRPCServer
@@ -69,7 +69,7 @@ type XXX_serverReflectionServer struct {
symbols map[string]*dpb.FileDescriptorProto // map of fully-qualified names to files
}
-func (r *XXX_serverReflectionServer) SetGRPCServer(s *grpc.Server) {
+func (r *DubbogoServerReflectionServer) SetGRPCServer(s *grpc.Server) {
r.s = s
}
@@ -81,7 +81,7 @@ type protoMessage interface {
Descriptor() ([]byte, []int)
}
-func (s *XXX_serverReflectionServer) getSymbols() (svcNames []string, symbolIndex map[string]*dpb.FileDescriptorProto) {
+func (s *DubbogoServerReflectionServer) getSymbols() (svcNames []string, symbolIndex map[string]*dpb.FileDescriptorProto) {
s.initSymbols.Do(func() {
serviceInfo := s.s.GetServiceInfo()
@@ -106,7 +106,7 @@ func (s *XXX_serverReflectionServer) getSymbols() (svcNames []string, symbolInde
return s.serviceNames, s.symbols
}
-func (s *XXX_serverReflectionServer) processFile(fd *dpb.FileDescriptorProto, processed map[string]struct{}) {
+func (s *DubbogoServerReflectionServer) processFile(fd *dpb.FileDescriptorProto, processed map[string]struct{}) {
filename := fd.GetName()
if _, ok := processed[filename]; ok {
return
@@ -143,7 +143,7 @@ func (s *XXX_serverReflectionServer) processFile(fd *dpb.FileDescriptorProto, pr
}
}
-func (s *XXX_serverReflectionServer) processMessage(fd *dpb.FileDescriptorProto, prefix string, msg *dpb.DescriptorProto) {
+func (s *DubbogoServerReflectionServer) processMessage(fd *dpb.FileDescriptorProto, prefix string, msg *dpb.DescriptorProto) {
msgName := fqn(prefix, msg.GetName())
s.symbols[msgName] = fd
@@ -165,7 +165,7 @@ func (s *XXX_serverReflectionServer) processMessage(fd *dpb.FileDescriptorProto,
}
}
-func (s *XXX_serverReflectionServer) processEnum(fd *dpb.FileDescriptorProto, prefix string, en *dpb.EnumDescriptorProto) {
+func (s *DubbogoServerReflectionServer) processEnum(fd *dpb.FileDescriptorProto, prefix string, en *dpb.EnumDescriptorProto) {
enName := fqn(prefix, en.GetName())
s.symbols[enName] = fd
@@ -175,7 +175,7 @@ func (s *XXX_serverReflectionServer) processEnum(fd *dpb.FileDescriptorProto, pr
}
}
-func (s *XXX_serverReflectionServer) processField(fd *dpb.FileDescriptorProto, prefix string, fld *dpb.FieldDescriptorProto) {
+func (s *DubbogoServerReflectionServer) processField(fd *dpb.FileDescriptorProto, prefix string, fld *dpb.FieldDescriptorProto) {
fldName := fqn(prefix, fld.GetName())
s.symbols[fldName] = fd
}
@@ -189,7 +189,7 @@ func fqn(prefix, name string) string {
// fileDescForType gets the file descriptor for the given type.
// The given type should be a proto message.
-func (s *XXX_serverReflectionServer) fileDescForType(st reflect.Type) (*dpb.FileDescriptorProto, error) {
+func (s *DubbogoServerReflectionServer) fileDescForType(st reflect.Type) (*dpb.FileDescriptorProto, error) {
m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(protoMessage)
if !ok {
return nil, fmt.Errorf("failed to create message from type: %v", st)
@@ -258,7 +258,7 @@ func fileDescContainingExtension(st reflect.Type, ext int32) (*dpb.FileDescripto
return decodeFileDesc(proto.FileDescriptor(extDesc.Filename))
}
-func (s *XXX_serverReflectionServer) allExtensionNumbersForType(st reflect.Type) ([]int32, error) {
+func (s *DubbogoServerReflectionServer) allExtensionNumbersForType(st reflect.Type) ([]int32, error) {
m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(proto.Message)
if !ok {
return nil, fmt.Errorf("failed to create message from type: %v", st)
@@ -304,7 +304,7 @@ func fileDescWithDependencies(fd *dpb.FileDescriptorProto, sentFileDescriptors m
// fileDescEncodingByFilename finds the file descriptor for given filename,
// finds all of its previously unsent transitive dependencies, does marshaling
// on them, and returns the marshaled result.
-func (s *XXX_serverReflectionServer) fileDescEncodingByFilename(name string, sentFileDescriptors map[string]bool) ([][]byte, error) {
+func (s *DubbogoServerReflectionServer) fileDescEncodingByFilename(name string, sentFileDescriptors map[string]bool) ([][]byte, error) {
enc := proto.FileDescriptor(name)
if enc == nil {
return nil, fmt.Errorf("unknown file: %v", name)
@@ -338,7 +338,7 @@ func parseMetadata(meta interface{}) ([]byte, bool) {
// given symbol, finds all of its previously unsent transitive dependencies,
// does marshaling on them, and returns the marshaled result. The given symbol
// can be a type, a service or a method.
-func (s *XXX_serverReflectionServer) fileDescEncodingContainingSymbol(name string, sentFileDescriptors map[string]bool) ([][]byte, error) {
+func (s *DubbogoServerReflectionServer) fileDescEncodingContainingSymbol(name string, sentFileDescriptors map[string]bool) ([][]byte, error) {
_, symbols := s.getSymbols()
//
if strings.HasPrefix(name, "grpc.") {
@@ -367,7 +367,7 @@ func (s *XXX_serverReflectionServer) fileDescEncodingContainingSymbol(name strin
// fileDescEncodingContainingExtension finds the file descriptor containing
// given extension, finds all of its previously unsent transitive dependencies,
// does marshaling on them, and returns the marshaled result.
-func (s *XXX_serverReflectionServer) fileDescEncodingContainingExtension(typeName string, extNum int32, sentFileDescriptors map[string]bool) ([][]byte, error) {
+func (s *DubbogoServerReflectionServer) fileDescEncodingContainingExtension(typeName string, extNum int32, sentFileDescriptors map[string]bool) ([][]byte, error) {
st, err := typeForName(typeName)
if err != nil {
return nil, err
@@ -380,7 +380,7 @@ func (s *XXX_serverReflectionServer) fileDescEncodingContainingExtension(typeNam
}
// allExtensionNumbersForTypeName returns all extension numbers for the given type.
-func (s *XXX_serverReflectionServer) allExtensionNumbersForTypeName(name string) ([]int32, error) {
+func (s *DubbogoServerReflectionServer) allExtensionNumbersForTypeName(name string) ([]int32, error) {
st, err := typeForName(name)
if err != nil {
return nil, err
@@ -393,7 +393,7 @@ func (s *XXX_serverReflectionServer) allExtensionNumbersForTypeName(name string)
}
// ServerReflectionInfo is the reflection service handler.
-func (s *XXX_serverReflectionServer) ServerReflectionInfo(stream rpb.ServerReflection_ServerReflectionInfoServer) error {
+func (s *DubbogoServerReflectionServer) ServerReflectionInfo(stream rpb.ServerReflection_ServerReflectionInfoServer) error {
sentFileDescriptors := make(map[string]bool)
for {
in, err := stream.Recv()
@@ -493,5 +493,5 @@ func (s *XXX_serverReflectionServer) ServerReflectionInfo(stream rpb.ServerRefle
}
}
func init() {
- config.SetProviderService(&XXX_serverReflectionServer{})
+ config.SetProviderService(&DubbogoServerReflectionServer{})
}
diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go
index dad12df20..6287f79f8 100644
--- a/registry/protocol/protocol.go
+++ b/registry/protocol/protocol.go
@@ -39,6 +39,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/config_center"
_ "dubbo.apache.org/dubbo-go/v3/config_center/configurator"
"dubbo.apache.org/dubbo-go/v3/protocol"
+ "dubbo.apache.org/dubbo-go/v3/protocol/dubbo3/health"
"dubbo.apache.org/dubbo-go/v3/protocol/protocolwrapper"
"dubbo.apache.org/dubbo-go/v3/registry"
_ "dubbo.apache.org/dubbo-go/v3/registry/directory"
@@ -191,6 +192,9 @@ func (proto *registryProtocol) Export(originInvoker protocol.Invoker) protocol.E
// export invoker
exporter := proto.doLocalExport(originInvoker, providerUrl)
+ // update health status
+ health.SetServingStatusServing(registryUrl.Service())
+
if len(registryUrl.Protocol) > 0 {
// url to registry
reg := proto.getRegistry(registryUrl)