You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2019/12/20 08:51:39 UTC
[dubbo] branch 2.7.5-release updated: Fixes #5511,
gRPC client does not update channel status accordingly as server
instance status change. (#5515)
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 2.7.5-release
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/2.7.5-release by this push:
new 6249f0f Fixes #5511, gRPC client does not update channel status accordingly as server instance status change. (#5515)
6249f0f is described below
commit 6249f0fd506c0fb77cdf1903f7af0c5fa2f5589c
Author: ken.lj <ke...@gmail.com>
AuthorDate: Fri Dec 20 16:51:30 2019 +0800
Fixes #5511, gRPC client does not update channel status accordingly as server instance status change. (#5515)
---
.../dubbo/rpc/protocol/grpc/GrpcInvoker.java | 23 ++++--
.../dubbo/rpc/protocol/grpc/GrpcProtocol.java | 47 ++++++++++--
.../grpc/ReferenceCountManagedChannel.java | 85 ++++++++++++++++++++++
3 files changed, 144 insertions(+), 11 deletions(-)
diff --git a/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/GrpcInvoker.java b/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/GrpcInvoker.java
index 51dab22..c7e0749 100644
--- a/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/GrpcInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/GrpcInvoker.java
@@ -23,21 +23,23 @@ import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.protocol.AbstractInvoker;
-import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusException;
+import java.util.concurrent.locks.ReentrantLock;
+
public class GrpcInvoker<T> extends AbstractInvoker<T> {
+ private final ReentrantLock destroyLock = new ReentrantLock();
private final Invoker<T> target;
- private ManagedChannel channel;
+ private ReferenceCountManagedChannel channel;
// private static List<Exception> grpcExceptions = new ArrayList<>();
// static {
// grpcExceptions.add();
// }
- public GrpcInvoker(Class<T> type, URL url, Invoker<T> target, ManagedChannel channel) {
+ public GrpcInvoker(Class<T> type, URL url, Invoker<T> target, ReferenceCountManagedChannel channel) {
super(type, url);
this.target = target;
this.channel = channel;
@@ -75,8 +77,19 @@ public class GrpcInvoker<T> extends AbstractInvoker<T> {
@Override
public void destroy() {
- super.destroy();
- channel.shutdown();
+ if (!super.isDestroyed()) {
+ // double check to avoid dup close
+ destroyLock.lock();
+ try {
+ if (super.isDestroyed()) {
+ return;
+ }
+ super.destroy();
+ channel.shutdown();
+ } finally {
+ destroyLock.unlock();
+ }
+ }
}
private RpcException getRpcException(Class<?> type, URL url, Invocation invocation, Throwable e) {
diff --git a/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/GrpcProtocol.java b/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/GrpcProtocol.java
index afc50db..80e206c 100644
--- a/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/GrpcProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/GrpcProtocol.java
@@ -49,8 +49,9 @@ public class GrpcProtocol extends AbstractProxyProtocol {
public final static int DEFAULT_PORT = 50051;
- /* <address, gRPC channel> */
- private final ConcurrentMap<String, ManagedChannel> channelMap = new ConcurrentHashMap<>();
+ /* <address, gRPC channels> */
+ private final ConcurrentMap<String, ReferenceCountManagedChannel> channelMap = new ConcurrentHashMap<>();
+ private final Object lock = new Object();
@Override
protected <T> Runnable doExport(T proxiedImpl, Class<T> type, URL url) throws RpcException {
@@ -113,9 +114,7 @@ public class GrpcProtocol extends AbstractProxyProtocol {
}
// Channel
- ManagedChannel channel = channelMap.computeIfAbsent(url.getAddress(),
- k -> GrpcOptionsUtils.buildManagedChannel(url)
- );
+ ReferenceCountManagedChannel channel = getSharedChannel(url);
// CallOptions
try {
@@ -148,6 +147,41 @@ public class GrpcProtocol extends AbstractProxyProtocol {
throw new UnsupportedOperationException("not used");
}
+ /**
+ * Get shared channel connection
+ */
+ private ReferenceCountManagedChannel getSharedChannel(URL url) {
+ String key = url.getAddress();
+ ReferenceCountManagedChannel channel = channelMap.get(key);
+
+ if (channel != null && !channel.isTerminated()) {
+ channel.incrementAndGetCount();
+ return channel;
+ }
+
+ synchronized (lock) {
+ channel = channelMap.get(key);
+ // dubbo check
+ if (channel != null && !channel.isTerminated()) {
+ channel.incrementAndGetCount();
+ } else {
+ channel = new ReferenceCountManagedChannel(initChannel(url));
+ channelMap.put(key, channel);
+ }
+ }
+
+ return channel;
+ }
+
+ /**
+ * Create new connection
+ *
+ * @param url
+ */
+ private ManagedChannel initChannel(URL url) {
+ return GrpcOptionsUtils.buildManagedChannel(url);
+ }
+
@Override
public int getDefaultPort() {
return DEFAULT_PORT;
@@ -156,9 +190,10 @@ public class GrpcProtocol extends AbstractProxyProtocol {
@Override
public void destroy() {
serverMap.values().forEach(ProtocolServer::close);
- channelMap.values().forEach(ManagedChannel::shutdown);
+ channelMap.values().forEach(ReferenceCountManagedChannel::shutdown);
serverMap.clear();
channelMap.clear();
+ super.destroy();
}
public class GrpcRemotingServer extends RemotingServerAdapter {
diff --git a/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/ReferenceCountManagedChannel.java b/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/ReferenceCountManagedChannel.java
new file mode 100644
index 0000000..3b46f1e
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/ReferenceCountManagedChannel.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.grpc;
+
+import io.grpc.CallOptions;
+import io.grpc.ClientCall;
+import io.grpc.ManagedChannel;
+import io.grpc.MethodDescriptor;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Also see ReferenceCountExchangeClient
+ */
+public class ReferenceCountManagedChannel extends ManagedChannel {
+
+ private final AtomicInteger referenceCount = new AtomicInteger(0);
+
+ private ManagedChannel grpcChannel;
+
+ public ReferenceCountManagedChannel(ManagedChannel delegated) {
+ this.grpcChannel = delegated;
+ }
+
+ /**
+ * The reference count of current ExchangeClient, connection will be closed if all invokers destroyed.
+ */
+ public void incrementAndGetCount() {
+ referenceCount.incrementAndGet();
+ }
+
+ @Override
+ public ManagedChannel shutdown() {
+ if (referenceCount.decrementAndGet() <= 0) {
+ return grpcChannel.shutdown();
+ }
+ return grpcChannel;
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return grpcChannel.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return grpcChannel.isTerminated();
+ }
+
+ @Override
+ public ManagedChannel shutdownNow() {
+ // TODO
+ return shutdown();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return grpcChannel.awaitTermination(timeout, unit);
+ }
+
+ @Override
+ public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
+ return grpcChannel.newCall(methodDescriptor, callOptions);
+ }
+
+ @Override
+ public String authority() {
+ return grpcChannel.authority();
+ }
+}