You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2019/12/20 08:51:39 UTC

[dubbo] branch 2.7.5-release updated: Fixes #5511, gRPC client does not update channel status accordingly as server instance status change. (#5515)

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 2.7.5-release
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/2.7.5-release by this push:
     new 6249f0f  Fixes #5511, gRPC client does not update channel status accordingly as server instance status change. (#5515)
6249f0f is described below

commit 6249f0fd506c0fb77cdf1903f7af0c5fa2f5589c
Author: ken.lj <ke...@gmail.com>
AuthorDate: Fri Dec 20 16:51:30 2019 +0800

    Fixes #5511, gRPC client does not update channel status accordingly as server instance status change. (#5515)
---
 .../dubbo/rpc/protocol/grpc/GrpcInvoker.java       | 23 ++++--
 .../dubbo/rpc/protocol/grpc/GrpcProtocol.java      | 47 ++++++++++--
 .../grpc/ReferenceCountManagedChannel.java         | 85 ++++++++++++++++++++++
 3 files changed, 144 insertions(+), 11 deletions(-)

diff --git a/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/GrpcInvoker.java b/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/GrpcInvoker.java
index 51dab22..c7e0749 100644
--- a/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/GrpcInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/GrpcInvoker.java
@@ -23,21 +23,23 @@ import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.protocol.AbstractInvoker;
 
-import io.grpc.ManagedChannel;
 import io.grpc.Status;
 import io.grpc.StatusException;
 
+import java.util.concurrent.locks.ReentrantLock;
+
 public class GrpcInvoker<T> extends AbstractInvoker<T> {
+    private final ReentrantLock destroyLock = new ReentrantLock();
 
     private final Invoker<T> target;
-    private ManagedChannel channel;
+    private ReferenceCountManagedChannel channel;
 
 //    private static List<Exception> grpcExceptions = new ArrayList<>();
 //    static {
 //        grpcExceptions.add();
 //    }
 
-    public GrpcInvoker(Class<T> type, URL url, Invoker<T> target, ManagedChannel channel) {
+    public GrpcInvoker(Class<T> type, URL url, Invoker<T> target, ReferenceCountManagedChannel channel) {
         super(type, url);
         this.target = target;
         this.channel = channel;
@@ -75,8 +77,19 @@ public class GrpcInvoker<T> extends AbstractInvoker<T> {
 
     @Override
     public void destroy() {
-        super.destroy();
-        channel.shutdown();
+        if (!super.isDestroyed()) {
+            // double check to avoid dup close
+            destroyLock.lock();
+            try {
+                if (super.isDestroyed()) {
+                    return;
+                }
+                super.destroy();
+                channel.shutdown();
+            } finally {
+                destroyLock.unlock();
+            }
+        }
     }
 
     private RpcException getRpcException(Class<?> type, URL url, Invocation invocation, Throwable e) {
diff --git a/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/GrpcProtocol.java b/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/GrpcProtocol.java
index afc50db..80e206c 100644
--- a/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/GrpcProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/GrpcProtocol.java
@@ -49,8 +49,9 @@ public class GrpcProtocol extends AbstractProxyProtocol {
 
     public final static int DEFAULT_PORT = 50051;
 
-    /* <address, gRPC channel> */
-    private final ConcurrentMap<String, ManagedChannel> channelMap = new ConcurrentHashMap<>();
+    /* <address, gRPC channels> */
+    private final ConcurrentMap<String, ReferenceCountManagedChannel> channelMap = new ConcurrentHashMap<>();
+    private final Object lock = new Object();
 
     @Override
     protected <T> Runnable doExport(T proxiedImpl, Class<T> type, URL url) throws RpcException {
@@ -113,9 +114,7 @@ public class GrpcProtocol extends AbstractProxyProtocol {
         }
 
         // Channel
-        ManagedChannel channel = channelMap.computeIfAbsent(url.getAddress(),
-                k -> GrpcOptionsUtils.buildManagedChannel(url)
-        );
+        ReferenceCountManagedChannel channel = getSharedChannel(url);
 
         // CallOptions
         try {
@@ -148,6 +147,41 @@ public class GrpcProtocol extends AbstractProxyProtocol {
         throw new UnsupportedOperationException("not used");
     }
 
+    /**
+     * Get shared channel connection
+     */
+    private ReferenceCountManagedChannel getSharedChannel(URL url) {
+        String key = url.getAddress();
+        ReferenceCountManagedChannel channel = channelMap.get(key);
+
+        if (channel != null && !channel.isTerminated()) {
+            channel.incrementAndGetCount();
+            return channel;
+        }
+
+        synchronized (lock) {
+            channel = channelMap.get(key);
+            // dubbo check
+            if (channel != null && !channel.isTerminated()) {
+                channel.incrementAndGetCount();
+            } else {
+                channel = new ReferenceCountManagedChannel(initChannel(url));
+                channelMap.put(key, channel);
+            }
+        }
+
+        return channel;
+    }
+
+    /**
+     * Create new connection
+     *
+     * @param url
+     */
+    private ManagedChannel initChannel(URL url) {
+        return GrpcOptionsUtils.buildManagedChannel(url);
+    }
+
     @Override
     public int getDefaultPort() {
         return DEFAULT_PORT;
@@ -156,9 +190,10 @@ public class GrpcProtocol extends AbstractProxyProtocol {
     @Override
     public void destroy() {
         serverMap.values().forEach(ProtocolServer::close);
-        channelMap.values().forEach(ManagedChannel::shutdown);
+        channelMap.values().forEach(ReferenceCountManagedChannel::shutdown);
         serverMap.clear();
         channelMap.clear();
+        super.destroy();
     }
 
     public class GrpcRemotingServer extends RemotingServerAdapter {
diff --git a/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/ReferenceCountManagedChannel.java b/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/ReferenceCountManagedChannel.java
new file mode 100644
index 0000000..3b46f1e
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/ReferenceCountManagedChannel.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.grpc;
+
+import io.grpc.CallOptions;
+import io.grpc.ClientCall;
+import io.grpc.ManagedChannel;
+import io.grpc.MethodDescriptor;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Also see ReferenceCountExchangeClient
+ */
+public class ReferenceCountManagedChannel extends ManagedChannel {
+
+    private final AtomicInteger referenceCount = new AtomicInteger(0);
+
+    private ManagedChannel grpcChannel;
+
+    public ReferenceCountManagedChannel(ManagedChannel delegated) {
+        this.grpcChannel = delegated;
+    }
+
+    /**
+     * The reference count of current ExchangeClient, connection will be closed if all invokers destroyed.
+     */
+    public void incrementAndGetCount() {
+        referenceCount.incrementAndGet();
+    }
+
+    @Override
+    public ManagedChannel shutdown() {
+        if (referenceCount.decrementAndGet() <= 0) {
+            return grpcChannel.shutdown();
+        }
+        return grpcChannel;
+    }
+
+    @Override
+    public boolean isShutdown() {
+        return grpcChannel.isShutdown();
+    }
+
+    @Override
+    public boolean isTerminated() {
+        return grpcChannel.isTerminated();
+    }
+
+    @Override
+    public ManagedChannel shutdownNow() {
+        // TODO
+        return shutdown();
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+        return grpcChannel.awaitTermination(timeout, unit);
+    }
+
+    @Override
+    public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
+        return grpcChannel.newCall(methodDescriptor, callOptions);
+    }
+
+    @Override
+    public String authority() {
+        return grpcChannel.authority();
+    }
+}