You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/09/01 05:55:36 UTC

[dubbo] branch 3.0 updated: [3.0-triple] Add tri inner health service (#8644)

This is an automated email from the ASF dual-hosted git repository.

guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 7d84894  [3.0-triple] Add tri inner health service (#8644)
7d84894 is described below

commit 7d848941818709c822b70e2b57606b8f628b25fe
Author: earthchen <yo...@duobei.com>
AuthorDate: Wed Sep 1 00:55:15 2021 -0500

    [3.0-triple] Add tri inner health service (#8644)
    
    * Add tri inner health service
    
    * Ignore generate files
    
    * rename a better class name
    
    Co-authored-by: guohao <gu...@gmail.com>
---
 dubbo-rpc/dubbo-rpc-triple/pom.xml                 |  10 ++
 .../dubbo/rpc/protocol/tri/AbstractStream.java     |  40 +++---
 .../apache/dubbo/rpc/protocol/tri/GrpcStatus.java  |   9 +-
 .../rpc/protocol/tri/ServerTransportObserver.java  |   4 +-
 .../dubbo/rpc/protocol/tri/TripleProtocol.java     |  11 ++
 .../protocol/tri/service/HealthStatusManager.java  |  74 +++++++++++
 .../protocol/tri/service/TriBuiltinService.java    |  86 ++++++++++++
 .../rpc/protocol/tri/service/TriHealthImpl.java    | 145 +++++++++++++++++++++
 .../dubbo-rpc-triple/src/main/proto/health.proto   |  66 ++++++++++
 pom.xml                                            |   1 +
 10 files changed, 420 insertions(+), 26 deletions(-)

diff --git a/dubbo-rpc/dubbo-rpc-triple/pom.xml b/dubbo-rpc/dubbo-rpc-triple/pom.xml
index cd82baf..c99cfb4 100644
--- a/dubbo-rpc/dubbo-rpc-triple/pom.xml
+++ b/dubbo-rpc/dubbo-rpc-triple/pom.xml
@@ -28,6 +28,7 @@
     <description>The triple protocol module</description>
     <properties>
         <skip_maven_deploy>false</skip_maven_deploy>
+        <dubbo.compiler.version>0.0.3-SNAPSHOT</dubbo.compiler.version>
     </properties>
     <dependencies>
         <dependency>
@@ -65,6 +66,15 @@
                 <version>0.6.1</version>
                 <configuration>
                     <protocArtifact>com.google.protobuf:protoc:3.12.0:exe:${os.detected.classifier}</protocArtifact>
+                    <protocPlugins>
+                        <protocPlugin>
+                            <id>dubbo</id>
+                            <groupId>org.apache.dubbo</groupId>
+                            <artifactId>dubbo-compiler</artifactId>
+                            <version>${dubbo.compiler.version}</version>
+                            <mainClass>org.apache.dubbo.gen.dubbo.Dubbo3Generator</mainClass>
+                        </protocPlugin>
+                    </protocPlugins>
                 </configuration>
                 <executions>
                     <execution>
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
index f3e9813..4265de7 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
@@ -185,10 +185,13 @@ public abstract class AbstractStream implements Stream {
         return transportObserver;
     }
 
-    protected void transportError(GrpcStatus status, Map<String, Object> attachments) {
-        // set metadata
-        Metadata metadata = getMetaData(status);
-        getTransportSubscriber().onMetadata(metadata, false);
+    // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses
+    protected void transportError(GrpcStatus status, Map<String, Object> attachments, boolean onlyTrailers) {
+        if (!onlyTrailers) {
+            // set metadata
+            Metadata metadata = new DefaultMetadata();
+            getTransportSubscriber().onMetadata(metadata, false);
+        }
         // set trailers
         Metadata trailers = getTrailers(status);
         if (attachments != null) {
@@ -196,31 +199,22 @@ public abstract class AbstractStream implements Stream {
         }
         getTransportSubscriber().onMetadata(trailers, true);
         if (LOGGER.isErrorEnabled()) {
-            LOGGER.error("[Triple-Server-Error] " + status.toMessage());
+            LOGGER.error("[Triple-Server-Error] status=" + status.code.code + " service=" + getServiceDescriptor().getServiceName()
+                + " method=" + getMethodName() +" onlyTrailers=" + onlyTrailers, status.cause);
         }
     }
 
+    protected void transportError(GrpcStatus status, Map<String, Object> attachments) {
+        transportError(status, attachments,false);
+    }
+
     protected void transportError(GrpcStatus status) {
         transportError(status, null);
     }
 
     protected void transportError(Throwable throwable) {
         GrpcStatus status = new GrpcStatus(Code.UNKNOWN, throwable, throwable.getMessage());
-        Metadata metadata = getMetaData(status);
-        getTransportSubscriber().onMetadata(metadata, false);
-        Metadata trailers = getTrailers(status);
-        getTransportSubscriber().onMetadata(trailers, true);
-        if (LOGGER.isErrorEnabled()) {
-            LOGGER.error("[Triple-Server-Error] service=" + getServiceDescriptor().getServiceName()
-                    + " method=" + getMethodName(), throwable);
-        }
-    }
-
-    private Metadata getMetaData(GrpcStatus status) {
-        Metadata metadata = new DefaultMetadata();
-        metadata.put(TripleHeaderEnum.MESSAGE_KEY.getHeader(), getGrpcMessage(status));
-        metadata.put(TripleHeaderEnum.STATUS_KEY.getHeader(), String.valueOf(status.code.code));
-        return metadata;
+        transportError(status, null);
     }
 
     private String getGrpcMessage(GrpcStatus status) {
@@ -234,13 +228,17 @@ public abstract class AbstractStream implements Stream {
     }
 
     private Metadata getTrailers(GrpcStatus grpcStatus) {
-
         Metadata metadata = new DefaultMetadata();
+        metadata.put(TripleHeaderEnum.MESSAGE_KEY.getHeader(), getGrpcMessage(grpcStatus));
+        metadata.put(TripleHeaderEnum.STATUS_KEY.getHeader(), String.valueOf(grpcStatus.code.code));
         Status.Builder builder = Status.newBuilder()
                 .setCode(grpcStatus.code.code)
                 .setMessage(getGrpcMessage(grpcStatus));
         Throwable throwable = grpcStatus.cause;
         if (throwable == null) {
+            Status status = builder.build();
+            metadata.put(TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader(),
+                TripleUtil.encodeBase64ASCII(status.toByteArray()));
             return metadata;
         }
         DebugInfo debugInfo = DebugInfo.newBuilder()
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
index 40d6e86..68f8769 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
@@ -142,7 +142,7 @@ public class GrpcStatus {
         return encoder.toString().substring(2);
     }
 
-    enum Code {
+    public enum Code {
         OK(0),
         CANCELLED(1),
         UNKNOWN(2),
@@ -158,7 +158,12 @@ public class GrpcStatus {
         UNIMPLEMENTED(12),
         INTERNAL(13),
         UNAVAILABLE(14),
-        DATA_LOSS(15);
+        DATA_LOSS(15),
+        /**
+         * The request does not have valid authentication credentials for the
+         * operation.
+         */
+        UNAUTHENTICATED(16);
 
         final int code;
 
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
index 11b0c2d..fabea1c 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
@@ -43,10 +43,8 @@ public class ServerTransportObserver implements TransportObserver {
             headerSent = true;
             headers.status(OK.codeAsText());
             headers.set(TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader(), TripleConstant.CONTENT_PROTO);
-            ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream));
-        } else {
-            ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream));
         }
+        ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream));
     }
 
     @Override
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
index 2021d43..5b86902 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
@@ -16,6 +16,7 @@
  */
 package org.apache.dubbo.rpc.protocol.tri;
 
+import grpc.health.v1.HealthCheckResponse;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.extension.ExtensionLoader;
 import org.apache.dubbo.common.logger.Logger;
@@ -30,6 +31,7 @@ import org.apache.dubbo.rpc.Protocol;
 import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.protocol.AbstractExporter;
 import org.apache.dubbo.rpc.protocol.AbstractProtocol;
+import org.apache.dubbo.rpc.protocol.tri.service.TriBuiltinService;
 
 import java.util.ArrayList;
 
@@ -47,6 +49,10 @@ public class TripleProtocol extends AbstractProtocol implements Protocol {
     private final ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class)
             .getDefaultExtension();
 
+    static {
+        TriBuiltinService.init();
+    }
+
     @Override
     public int getDefaultPort() {
         return 50051;
@@ -71,6 +77,11 @@ public class TripleProtocol extends AbstractProtocol implements Protocol {
 
         pathResolver.add(url.getServiceKey(), invoker);
         pathResolver.add(url.getServiceInterface(), invoker);
+
+        // set service status
+        TriBuiltinService.getHealthStatusManager().setStatus(url.getServiceKey(), HealthCheckResponse.ServingStatus.SERVING);
+        TriBuiltinService.getHealthStatusManager().setStatus(url.getServiceInterface(), HealthCheckResponse.ServingStatus.SERVING);
+
         PortUnificationExchanger.bind(invoker.getUrl());
         return exporter;
     }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/HealthStatusManager.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/HealthStatusManager.java
new file mode 100644
index 0000000..c1afe03
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/HealthStatusManager.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.service;
+
+import grpc.health.v1.Health;
+import grpc.health.v1.HealthCheckResponse;
+
+public class HealthStatusManager {
+
+    /**
+     * The special "service name" that represent all services on a GRPC server.  It is an empty string.
+     */
+    public static final String SERVICE_NAME_ALL_SERVICES = "";
+
+    private final TriHealthImpl healthService;
+
+    public HealthStatusManager(TriHealthImpl healthService) {
+        this.healthService = healthService;
+    }
+
+    public Health getHealthService() {
+        return healthService;
+    }
+
+    /**
+     * Updates the status of the server.
+     *
+     * @param service the name of some aspect of the server that is associated with a health status. This name can have
+     *                no relation with the gRPC services that the server is running with. It can also be an empty String
+     *                {@code ""} per the gRPC specification.
+     * @param status  is one of the values {@link HealthCheckResponse.ServingStatus#SERVING}, {@link
+     *                HealthCheckResponse.ServingStatus#NOT_SERVING} and
+     *                {@link HealthCheckResponse.ServingStatus#UNKNOWN}.
+     */
+    public void setStatus(String service, HealthCheckResponse.ServingStatus status) {
+        healthService.setStatus(service, status);
+    }
+
+    /**
+     * Clears the health status record of a service. The health service will respond with NOT_FOUND error on checking
+     * the status of a cleared service.
+     *
+     * @param service the name of some aspect of the server that is associated with a health status. This name can have
+     *                no relation with the gRPC services that the server is running with. It can also be an empty String
+     *                {@code ""} per the gRPC specification.
+     */
+    public void clearStatus(String service) {
+        healthService.clearStatus(service);
+    }
+
+    /**
+     * enterTerminalState causes the health status manager to mark all services as not serving, and prevents future
+     * updates to services.  This method is meant to be called prior to server shutdown as a way to indicate that
+     * clients should redirect their traffic elsewhere.
+     */
+    public void enterTerminalState() {
+        healthService.enterTerminalState();
+    }
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriBuiltinService.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriBuiltinService.java
new file mode 100644
index 0000000..5229aea
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriBuiltinService.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.tri.service;
+
+import grpc.health.v1.Health;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.url.component.ServiceConfigURL;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.ProxyFactory;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ServiceDescriptor;
+import org.apache.dubbo.rpc.model.ServiceMetadata;
+import org.apache.dubbo.rpc.model.ServiceRepository;
+import org.apache.dubbo.rpc.protocol.tri.PathResolver;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_VALUE;
+
+/**
+ * tri internal  service like grpc internal service
+ **/
+public class TriBuiltinService {
+
+    private static final ProxyFactory PROXY_FACTORY =
+        ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
+
+    private static final PathResolver PATH_RESOLVER = ExtensionLoader.getExtensionLoader(PathResolver.class)
+        .getDefaultExtension();
+
+    private static final ServiceRepository repository = ApplicationModel.getServiceRepository();
+
+    private static final Map<Class<?>, Object> TRI_SERVICES = new HashMap<>();
+
+    private static final HealthStatusManager HEALTH_STATUS_MANAGER;
+
+    private static final AtomicBoolean init = new AtomicBoolean();
+
+    static {
+        HEALTH_STATUS_MANAGER = new HealthStatusManager(new TriHealthImpl());
+        TRI_SERVICES.put(Health.class, HEALTH_STATUS_MANAGER.getHealthService());
+    }
+
+    public static void init() {
+        if (init.compareAndSet(false, true)) {
+            TRI_SERVICES.forEach((clz, impl) -> {
+                ServiceDescriptor serviceDescriptor = repository.registerService(clz);
+                repository.registerProvider(
+                    clz.getName(),
+                    impl,
+                    serviceDescriptor,
+                    null,
+                    new ServiceMetadata()
+                );
+                int port = 0;
+                URL url = new ServiceConfigURL(CommonConstants.TRIPLE, null,
+                    null, ANYHOST_VALUE, port, clz.getName());
+                Invoker<?> invoker = PROXY_FACTORY.getInvoker(impl, (Class) clz, url);
+                PATH_RESOLVER.add(url.getServiceKey(), invoker);
+                PATH_RESOLVER.add(url.getServiceInterface(), invoker);
+            });
+        }
+    }
+
+    public static HealthStatusManager getHealthStatusManager() {
+        return HEALTH_STATUS_MANAGER;
+    }
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriHealthImpl.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriHealthImpl.java
new file mode 100644
index 0000000..af313ff
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriHealthImpl.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.service;
+
+import grpc.health.v1.Health;
+import grpc.health.v1.HealthCheckRequest;
+import grpc.health.v1.HealthCheckResponse;
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.GrpcStatus;
+import org.apache.dubbo.rpc.protocol.tri.TripleRpcException;
+
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class TriHealthImpl implements Health {
+
+    private static final Logger logger = Logger.getLogger(TriHealthImpl.class.getName());
+
+    // Due to the latency of rpc calls, synchronization of the map does not help with consistency.
+    // However, need use ConcurrentHashMap to allow concurrent reading by check().
+    private final Map<String, HealthCheckResponse.ServingStatus> statusMap = new ConcurrentHashMap<>();
+
+    private final Object watchLock = new Object();
+
+    // Indicates if future status changes should be ignored.
+    private boolean terminal;
+
+    // Technically a Multimap<String, StreamObserver<HealthCheckResponse>>.  The Boolean value is not
+    // used.  The StreamObservers need to be kept in a identity-equality set, to make sure
+    // user-defined equals() doesn't confuse our book-keeping of the StreamObservers.  Constructing
+    // such Multimap would require extra lines and the end result is not significantly simpler, thus I
+    // would rather not have the Guava collections dependency.
+    private final HashMap<String, IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean>>
+        watchers = new HashMap<>();
+
+    public TriHealthImpl() {
+        // Copy of what Go and C++ do.
+        statusMap.put(HealthStatusManager.SERVICE_NAME_ALL_SERVICES, HealthCheckResponse.ServingStatus.SERVING);
+    }
+
+    @Override
+    public HealthCheckResponse check(HealthCheckRequest request) {
+        HealthCheckResponse.ServingStatus status = statusMap.get(request.getService());
+        if (status != null) {
+            return HealthCheckResponse.newBuilder().setStatus(status).build();
+        }
+        throw new TripleRpcException(GrpcStatus.fromCode(GrpcStatus.Code.NOT_FOUND)
+            .withDescription("unknown service " + request.getService()));
+    }
+
+    @Override
+    public void watch(HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
+        final String service = request.getService();
+        synchronized (watchLock) {
+            HealthCheckResponse.ServingStatus status = statusMap.get(service);
+            responseObserver.onNext(getResponseForWatch(status));
+            IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers =
+                watchers.get(service);
+            if (serviceWatchers == null) {
+                serviceWatchers = new IdentityHashMap<>();
+                watchers.put(service, serviceWatchers);
+            }
+            serviceWatchers.put(responseObserver, Boolean.TRUE);
+        }
+        // todo add client cancel listener
+    }
+
+    void setStatus(String service, HealthCheckResponse.ServingStatus status) {
+        synchronized (watchLock) {
+            if (terminal) {
+                logger.log(Level.FINE, "Ignoring status {} for {}", new Object[]{status, service});
+                return;
+            }
+            setStatusInternal(service, status);
+        }
+    }
+
+    private void setStatusInternal(String service, HealthCheckResponse.ServingStatus status) {
+        HealthCheckResponse.ServingStatus prevStatus = statusMap.put(service, status);
+        if (prevStatus != status) {
+            notifyWatchers(service, status);
+        }
+    }
+
+    void clearStatus(String service) {
+        synchronized (watchLock) {
+            if (terminal) {
+                logger.log(Level.FINE, "Ignoring status clearing for {}", new Object[]{service});
+                return;
+            }
+            HealthCheckResponse.ServingStatus prevStatus = statusMap.remove(service);
+            if (prevStatus != null) {
+                notifyWatchers(service, null);
+            }
+        }
+    }
+
+    void enterTerminalState() {
+        synchronized (watchLock) {
+            if (terminal) {
+                logger.log(Level.WARNING, "Already terminating", new RuntimeException());
+                return;
+            }
+            terminal = true;
+            for (String service : statusMap.keySet()) {
+                setStatusInternal(service, HealthCheckResponse.ServingStatus.NOT_SERVING);
+            }
+        }
+    }
+
+    private void notifyWatchers(String service, HealthCheckResponse.ServingStatus status) {
+        HealthCheckResponse response = getResponseForWatch(status);
+        IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers =
+            watchers.get(service);
+        if (serviceWatchers != null) {
+            for (StreamObserver<HealthCheckResponse> responseObserver : serviceWatchers.keySet()) {
+                responseObserver.onNext(response);
+            }
+        }
+    }
+
+    private static HealthCheckResponse getResponseForWatch(HealthCheckResponse.ServingStatus recordedStatus) {
+        return HealthCheckResponse.newBuilder().setStatus(
+            recordedStatus == null ? HealthCheckResponse.ServingStatus.SERVICE_UNKNOWN : recordedStatus).build();
+    }
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/proto/health.proto b/dubbo-rpc/dubbo-rpc-triple/src/main/proto/health.proto
new file mode 100644
index 0000000..89a27e2
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/proto/health.proto
@@ -0,0 +1,66 @@
+// Copyright 2015 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// The canonical version of this proto can be found at
+// https://github.com/grpc/grpc-proto/blob/master/grpc/health/v1/health.proto
+
+syntax = "proto3";
+
+package grpc.health.v1;
+
+option csharp_namespace = "Grpc.Health.V1";
+option go_package = "google.golang.org/grpc/health/grpc_health_v1";
+option java_multiple_files = true;
+option java_outer_classname = "HealthProto";
+//option java_package = "io.grpc.health.v1";
+// todo change to io.grpc.health.v1 wait stub complete
+option java_package = "grpc.health.v1";
+
+message HealthCheckRequest {
+  string service = 1;
+}
+
+message HealthCheckResponse {
+  enum ServingStatus {
+    UNKNOWN = 0;
+    SERVING = 1;
+    NOT_SERVING = 2;
+    SERVICE_UNKNOWN = 3;  // Used only by the Watch method.
+  }
+  ServingStatus status = 1;
+}
+
+service Health {
+  // If the requested service is unknown, the call will fail with status
+  // NOT_FOUND.
+  rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
+
+  // Performs a watch for the serving status of the requested service.
+  // The server will immediately send back a message indicating the current
+  // serving status.  It will then subsequently send a new message whenever
+  // the service's serving status changes.
+  //
+  // If the requested service is unknown when the call is received, the
+  // server will send a message setting the serving status to
+  // SERVICE_UNKNOWN but will *not* terminate the call.  If at some
+  // future point, the serving status of the service becomes known, the
+  // server will send a new message with the service's serving status.
+  //
+  // If the call terminates with status UNIMPLEMENTED, then clients
+  // should assume this method is not supported and should not retry the
+  // call.  If the call terminates with any other status (including OK),
+  // clients should retry the call with appropriate exponential backoff.
+  rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
+}
+
diff --git a/pom.xml b/pom.xml
index 47c666d..234cf9d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -320,6 +320,7 @@
                                         **/istio/v1/auth/IstioCertificateServiceGrpc.java,
                                         **/com/google/rpc/*,
                                         **/generated/**/*,
+                                        **/grpc/health/**/*,
                                         **/target/**/*,
                                         **/*.json
                                     </excludes>