You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/09/01 05:55:36 UTC
[dubbo] branch 3.0 updated: [3.0-triple] Add tri inner health
service (#8644)
This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 7d84894 [3.0-triple] Add tri inner health service (#8644)
7d84894 is described below
commit 7d848941818709c822b70e2b57606b8f628b25fe
Author: earthchen <yo...@duobei.com>
AuthorDate: Wed Sep 1 00:55:15 2021 -0500
[3.0-triple] Add tri inner health service (#8644)
* Add tri inner health service
* Ignore generate files
* rename a better class name
Co-authored-by: guohao <gu...@gmail.com>
---
dubbo-rpc/dubbo-rpc-triple/pom.xml | 10 ++
.../dubbo/rpc/protocol/tri/AbstractStream.java | 40 +++---
.../apache/dubbo/rpc/protocol/tri/GrpcStatus.java | 9 +-
.../rpc/protocol/tri/ServerTransportObserver.java | 4 +-
.../dubbo/rpc/protocol/tri/TripleProtocol.java | 11 ++
.../protocol/tri/service/HealthStatusManager.java | 74 +++++++++++
.../protocol/tri/service/TriBuiltinService.java | 86 ++++++++++++
.../rpc/protocol/tri/service/TriHealthImpl.java | 145 +++++++++++++++++++++
.../dubbo-rpc-triple/src/main/proto/health.proto | 66 ++++++++++
pom.xml | 1 +
10 files changed, 420 insertions(+), 26 deletions(-)
diff --git a/dubbo-rpc/dubbo-rpc-triple/pom.xml b/dubbo-rpc/dubbo-rpc-triple/pom.xml
index cd82baf..c99cfb4 100644
--- a/dubbo-rpc/dubbo-rpc-triple/pom.xml
+++ b/dubbo-rpc/dubbo-rpc-triple/pom.xml
@@ -28,6 +28,7 @@
<description>The triple protocol module</description>
<properties>
<skip_maven_deploy>false</skip_maven_deploy>
+ <dubbo.compiler.version>0.0.3-SNAPSHOT</dubbo.compiler.version>
</properties>
<dependencies>
<dependency>
@@ -65,6 +66,15 @@
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.12.0:exe:${os.detected.classifier}</protocArtifact>
+ <protocPlugins>
+ <protocPlugin>
+ <id>dubbo</id>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-compiler</artifactId>
+ <version>${dubbo.compiler.version}</version>
+ <mainClass>org.apache.dubbo.gen.dubbo.Dubbo3Generator</mainClass>
+ </protocPlugin>
+ </protocPlugins>
</configuration>
<executions>
<execution>
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
index f3e9813..4265de7 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
@@ -185,10 +185,13 @@ public abstract class AbstractStream implements Stream {
return transportObserver;
}
- protected void transportError(GrpcStatus status, Map<String, Object> attachments) {
- // set metadata
- Metadata metadata = getMetaData(status);
- getTransportSubscriber().onMetadata(metadata, false);
+ // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses
+ protected void transportError(GrpcStatus status, Map<String, Object> attachments, boolean onlyTrailers) {
+ if (!onlyTrailers) {
+ // set metadata
+ Metadata metadata = new DefaultMetadata();
+ getTransportSubscriber().onMetadata(metadata, false);
+ }
// set trailers
Metadata trailers = getTrailers(status);
if (attachments != null) {
@@ -196,31 +199,22 @@ public abstract class AbstractStream implements Stream {
}
getTransportSubscriber().onMetadata(trailers, true);
if (LOGGER.isErrorEnabled()) {
- LOGGER.error("[Triple-Server-Error] " + status.toMessage());
+ LOGGER.error("[Triple-Server-Error] status=" + status.code.code + " service=" + getServiceDescriptor().getServiceName()
+ + " method=" + getMethodName() +" onlyTrailers=" + onlyTrailers, status.cause);
}
}
+ protected void transportError(GrpcStatus status, Map<String, Object> attachments) {
+ transportError(status, attachments,false);
+ }
+
protected void transportError(GrpcStatus status) {
transportError(status, null);
}
protected void transportError(Throwable throwable) {
GrpcStatus status = new GrpcStatus(Code.UNKNOWN, throwable, throwable.getMessage());
- Metadata metadata = getMetaData(status);
- getTransportSubscriber().onMetadata(metadata, false);
- Metadata trailers = getTrailers(status);
- getTransportSubscriber().onMetadata(trailers, true);
- if (LOGGER.isErrorEnabled()) {
- LOGGER.error("[Triple-Server-Error] service=" + getServiceDescriptor().getServiceName()
- + " method=" + getMethodName(), throwable);
- }
- }
-
- private Metadata getMetaData(GrpcStatus status) {
- Metadata metadata = new DefaultMetadata();
- metadata.put(TripleHeaderEnum.MESSAGE_KEY.getHeader(), getGrpcMessage(status));
- metadata.put(TripleHeaderEnum.STATUS_KEY.getHeader(), String.valueOf(status.code.code));
- return metadata;
+ transportError(status, null);
}
private String getGrpcMessage(GrpcStatus status) {
@@ -234,13 +228,17 @@ public abstract class AbstractStream implements Stream {
}
private Metadata getTrailers(GrpcStatus grpcStatus) {
-
Metadata metadata = new DefaultMetadata();
+ metadata.put(TripleHeaderEnum.MESSAGE_KEY.getHeader(), getGrpcMessage(grpcStatus));
+ metadata.put(TripleHeaderEnum.STATUS_KEY.getHeader(), String.valueOf(grpcStatus.code.code));
Status.Builder builder = Status.newBuilder()
.setCode(grpcStatus.code.code)
.setMessage(getGrpcMessage(grpcStatus));
Throwable throwable = grpcStatus.cause;
if (throwable == null) {
+ Status status = builder.build();
+ metadata.put(TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader(),
+ TripleUtil.encodeBase64ASCII(status.toByteArray()));
return metadata;
}
DebugInfo debugInfo = DebugInfo.newBuilder()
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
index 40d6e86..68f8769 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
@@ -142,7 +142,7 @@ public class GrpcStatus {
return encoder.toString().substring(2);
}
- enum Code {
+ public enum Code {
OK(0),
CANCELLED(1),
UNKNOWN(2),
@@ -158,7 +158,12 @@ public class GrpcStatus {
UNIMPLEMENTED(12),
INTERNAL(13),
UNAVAILABLE(14),
- DATA_LOSS(15);
+ DATA_LOSS(15),
+ /**
+ * The request does not have valid authentication credentials for the
+ * operation.
+ */
+ UNAUTHENTICATED(16);
final int code;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
index 11b0c2d..fabea1c 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
@@ -43,10 +43,8 @@ public class ServerTransportObserver implements TransportObserver {
headerSent = true;
headers.status(OK.codeAsText());
headers.set(TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader(), TripleConstant.CONTENT_PROTO);
- ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream));
- } else {
- ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream));
}
+ ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream));
}
@Override
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
index 2021d43..5b86902 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
@@ -16,6 +16,7 @@
*/
package org.apache.dubbo.rpc.protocol.tri;
+import grpc.health.v1.HealthCheckResponse;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
@@ -30,6 +31,7 @@ import org.apache.dubbo.rpc.Protocol;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.protocol.AbstractExporter;
import org.apache.dubbo.rpc.protocol.AbstractProtocol;
+import org.apache.dubbo.rpc.protocol.tri.service.TriBuiltinService;
import java.util.ArrayList;
@@ -47,6 +49,10 @@ public class TripleProtocol extends AbstractProtocol implements Protocol {
private final ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class)
.getDefaultExtension();
+ static {
+ TriBuiltinService.init();
+ }
+
@Override
public int getDefaultPort() {
return 50051;
@@ -71,6 +77,11 @@ public class TripleProtocol extends AbstractProtocol implements Protocol {
pathResolver.add(url.getServiceKey(), invoker);
pathResolver.add(url.getServiceInterface(), invoker);
+
+ // set service status
+ TriBuiltinService.getHealthStatusManager().setStatus(url.getServiceKey(), HealthCheckResponse.ServingStatus.SERVING);
+ TriBuiltinService.getHealthStatusManager().setStatus(url.getServiceInterface(), HealthCheckResponse.ServingStatus.SERVING);
+
PortUnificationExchanger.bind(invoker.getUrl());
return exporter;
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/HealthStatusManager.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/HealthStatusManager.java
new file mode 100644
index 0000000..c1afe03
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/HealthStatusManager.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.service;
+
+import grpc.health.v1.Health;
+import grpc.health.v1.HealthCheckResponse;
+
+public class HealthStatusManager {
+
+ /**
+ * The special "service name" that represent all services on a GRPC server. It is an empty string.
+ */
+ public static final String SERVICE_NAME_ALL_SERVICES = "";
+
+ private final TriHealthImpl healthService;
+
+ public HealthStatusManager(TriHealthImpl healthService) {
+ this.healthService = healthService;
+ }
+
+ public Health getHealthService() {
+ return healthService;
+ }
+
+ /**
+ * Updates the status of the server.
+ *
+ * @param service the name of some aspect of the server that is associated with a health status. This name can have
+ * no relation with the gRPC services that the server is running with. It can also be an empty String
+ * {@code ""} per the gRPC specification.
+ * @param status is one of the values {@link HealthCheckResponse.ServingStatus#SERVING}, {@link
+ * HealthCheckResponse.ServingStatus#NOT_SERVING} and
+ * {@link HealthCheckResponse.ServingStatus#UNKNOWN}.
+ */
+ public void setStatus(String service, HealthCheckResponse.ServingStatus status) {
+ healthService.setStatus(service, status);
+ }
+
+ /**
+ * Clears the health status record of a service. The health service will respond with NOT_FOUND error on checking
+ * the status of a cleared service.
+ *
+ * @param service the name of some aspect of the server that is associated with a health status. This name can have
+ * no relation with the gRPC services that the server is running with. It can also be an empty String
+ * {@code ""} per the gRPC specification.
+ */
+ public void clearStatus(String service) {
+ healthService.clearStatus(service);
+ }
+
+ /**
+ * enterTerminalState causes the health status manager to mark all services as not serving, and prevents future
+ * updates to services. This method is meant to be called prior to server shutdown as a way to indicate that
+ * clients should redirect their traffic elsewhere.
+ */
+ public void enterTerminalState() {
+ healthService.enterTerminalState();
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriBuiltinService.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriBuiltinService.java
new file mode 100644
index 0000000..5229aea
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriBuiltinService.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.tri.service;
+
+import grpc.health.v1.Health;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.url.component.ServiceConfigURL;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.ProxyFactory;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ServiceDescriptor;
+import org.apache.dubbo.rpc.model.ServiceMetadata;
+import org.apache.dubbo.rpc.model.ServiceRepository;
+import org.apache.dubbo.rpc.protocol.tri.PathResolver;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_VALUE;
+
+/**
+ * tri internal service like grpc internal service
+ **/
+public class TriBuiltinService {
+
+ private static final ProxyFactory PROXY_FACTORY =
+ ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
+
+ private static final PathResolver PATH_RESOLVER = ExtensionLoader.getExtensionLoader(PathResolver.class)
+ .getDefaultExtension();
+
+ private static final ServiceRepository repository = ApplicationModel.getServiceRepository();
+
+ private static final Map<Class<?>, Object> TRI_SERVICES = new HashMap<>();
+
+ private static final HealthStatusManager HEALTH_STATUS_MANAGER;
+
+ private static final AtomicBoolean init = new AtomicBoolean();
+
+ static {
+ HEALTH_STATUS_MANAGER = new HealthStatusManager(new TriHealthImpl());
+ TRI_SERVICES.put(Health.class, HEALTH_STATUS_MANAGER.getHealthService());
+ }
+
+ public static void init() {
+ if (init.compareAndSet(false, true)) {
+ TRI_SERVICES.forEach((clz, impl) -> {
+ ServiceDescriptor serviceDescriptor = repository.registerService(clz);
+ repository.registerProvider(
+ clz.getName(),
+ impl,
+ serviceDescriptor,
+ null,
+ new ServiceMetadata()
+ );
+ int port = 0;
+ URL url = new ServiceConfigURL(CommonConstants.TRIPLE, null,
+ null, ANYHOST_VALUE, port, clz.getName());
+ Invoker<?> invoker = PROXY_FACTORY.getInvoker(impl, (Class) clz, url);
+ PATH_RESOLVER.add(url.getServiceKey(), invoker);
+ PATH_RESOLVER.add(url.getServiceInterface(), invoker);
+ });
+ }
+ }
+
+ public static HealthStatusManager getHealthStatusManager() {
+ return HEALTH_STATUS_MANAGER;
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriHealthImpl.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriHealthImpl.java
new file mode 100644
index 0000000..af313ff
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriHealthImpl.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.service;
+
+import grpc.health.v1.Health;
+import grpc.health.v1.HealthCheckRequest;
+import grpc.health.v1.HealthCheckResponse;
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.GrpcStatus;
+import org.apache.dubbo.rpc.protocol.tri.TripleRpcException;
+
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class TriHealthImpl implements Health {
+
+ private static final Logger logger = Logger.getLogger(TriHealthImpl.class.getName());
+
+ // Due to the latency of rpc calls, synchronization of the map does not help with consistency.
+ // However, need use ConcurrentHashMap to allow concurrent reading by check().
+ private final Map<String, HealthCheckResponse.ServingStatus> statusMap = new ConcurrentHashMap<>();
+
+ private final Object watchLock = new Object();
+
+ // Indicates if future status changes should be ignored.
+ private boolean terminal;
+
+ // Technically a Multimap<String, StreamObserver<HealthCheckResponse>>. The Boolean value is not
+ // used. The StreamObservers need to be kept in a identity-equality set, to make sure
+ // user-defined equals() doesn't confuse our book-keeping of the StreamObservers. Constructing
+ // such Multimap would require extra lines and the end result is not significantly simpler, thus I
+ // would rather not have the Guava collections dependency.
+ private final HashMap<String, IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean>>
+ watchers = new HashMap<>();
+
+ public TriHealthImpl() {
+ // Copy of what Go and C++ do.
+ statusMap.put(HealthStatusManager.SERVICE_NAME_ALL_SERVICES, HealthCheckResponse.ServingStatus.SERVING);
+ }
+
+ @Override
+ public HealthCheckResponse check(HealthCheckRequest request) {
+ HealthCheckResponse.ServingStatus status = statusMap.get(request.getService());
+ if (status != null) {
+ return HealthCheckResponse.newBuilder().setStatus(status).build();
+ }
+ throw new TripleRpcException(GrpcStatus.fromCode(GrpcStatus.Code.NOT_FOUND)
+ .withDescription("unknown service " + request.getService()));
+ }
+
+ @Override
+ public void watch(HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
+ final String service = request.getService();
+ synchronized (watchLock) {
+ HealthCheckResponse.ServingStatus status = statusMap.get(service);
+ responseObserver.onNext(getResponseForWatch(status));
+ IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers =
+ watchers.get(service);
+ if (serviceWatchers == null) {
+ serviceWatchers = new IdentityHashMap<>();
+ watchers.put(service, serviceWatchers);
+ }
+ serviceWatchers.put(responseObserver, Boolean.TRUE);
+ }
+ // todo add client cancel listener
+ }
+
+ void setStatus(String service, HealthCheckResponse.ServingStatus status) {
+ synchronized (watchLock) {
+ if (terminal) {
+ logger.log(Level.FINE, "Ignoring status {} for {}", new Object[]{status, service});
+ return;
+ }
+ setStatusInternal(service, status);
+ }
+ }
+
+ private void setStatusInternal(String service, HealthCheckResponse.ServingStatus status) {
+ HealthCheckResponse.ServingStatus prevStatus = statusMap.put(service, status);
+ if (prevStatus != status) {
+ notifyWatchers(service, status);
+ }
+ }
+
+ void clearStatus(String service) {
+ synchronized (watchLock) {
+ if (terminal) {
+ logger.log(Level.FINE, "Ignoring status clearing for {}", new Object[]{service});
+ return;
+ }
+ HealthCheckResponse.ServingStatus prevStatus = statusMap.remove(service);
+ if (prevStatus != null) {
+ notifyWatchers(service, null);
+ }
+ }
+ }
+
+ void enterTerminalState() {
+ synchronized (watchLock) {
+ if (terminal) {
+ logger.log(Level.WARNING, "Already terminating", new RuntimeException());
+ return;
+ }
+ terminal = true;
+ for (String service : statusMap.keySet()) {
+ setStatusInternal(service, HealthCheckResponse.ServingStatus.NOT_SERVING);
+ }
+ }
+ }
+
+ private void notifyWatchers(String service, HealthCheckResponse.ServingStatus status) {
+ HealthCheckResponse response = getResponseForWatch(status);
+ IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers =
+ watchers.get(service);
+ if (serviceWatchers != null) {
+ for (StreamObserver<HealthCheckResponse> responseObserver : serviceWatchers.keySet()) {
+ responseObserver.onNext(response);
+ }
+ }
+ }
+
+ private static HealthCheckResponse getResponseForWatch(HealthCheckResponse.ServingStatus recordedStatus) {
+ return HealthCheckResponse.newBuilder().setStatus(
+ recordedStatus == null ? HealthCheckResponse.ServingStatus.SERVICE_UNKNOWN : recordedStatus).build();
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/proto/health.proto b/dubbo-rpc/dubbo-rpc-triple/src/main/proto/health.proto
new file mode 100644
index 0000000..89a27e2
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/proto/health.proto
@@ -0,0 +1,66 @@
+// Copyright 2015 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// The canonical version of this proto can be found at
+// https://github.com/grpc/grpc-proto/blob/master/grpc/health/v1/health.proto
+
+syntax = "proto3";
+
+package grpc.health.v1;
+
+option csharp_namespace = "Grpc.Health.V1";
+option go_package = "google.golang.org/grpc/health/grpc_health_v1";
+option java_multiple_files = true;
+option java_outer_classname = "HealthProto";
+//option java_package = "io.grpc.health.v1";
+// todo change to io.grpc.health.v1 wait stub complete
+option java_package = "grpc.health.v1";
+
+message HealthCheckRequest {
+ string service = 1;
+}
+
+message HealthCheckResponse {
+ enum ServingStatus {
+ UNKNOWN = 0;
+ SERVING = 1;
+ NOT_SERVING = 2;
+ SERVICE_UNKNOWN = 3; // Used only by the Watch method.
+ }
+ ServingStatus status = 1;
+}
+
+service Health {
+ // If the requested service is unknown, the call will fail with status
+ // NOT_FOUND.
+ rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
+
+ // Performs a watch for the serving status of the requested service.
+ // The server will immediately send back a message indicating the current
+ // serving status. It will then subsequently send a new message whenever
+ // the service's serving status changes.
+ //
+ // If the requested service is unknown when the call is received, the
+ // server will send a message setting the serving status to
+ // SERVICE_UNKNOWN but will *not* terminate the call. If at some
+ // future point, the serving status of the service becomes known, the
+ // server will send a new message with the service's serving status.
+ //
+ // If the call terminates with status UNIMPLEMENTED, then clients
+ // should assume this method is not supported and should not retry the
+ // call. If the call terminates with any other status (including OK),
+ // clients should retry the call with appropriate exponential backoff.
+ rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
+}
+
diff --git a/pom.xml b/pom.xml
index 47c666d..234cf9d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -320,6 +320,7 @@
**/istio/v1/auth/IstioCertificateServiceGrpc.java,
**/com/google/rpc/*,
**/generated/**/*,
+ **/grpc/health/**/*,
**/target/**/*,
**/*.json
</excludes>