You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2021/02/02 07:08:51 UTC
[dubbo] branch 3.0 updated: Add :authority and :scheme to triple
headers (#7156)
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new c132438 Add :authority and :scheme to triple headers (#7156)
c132438 is described below
commit c132438c01dba18d6d0479b591b29753bbe4d204
Author: GuoHao <gu...@gmail.com>
AuthorDate: Tue Feb 2 15:08:17 2021 +0800
Add :authority and :scheme to triple headers (#7156)
---
.../org/apache/dubbo/remoting/api/Connection.java | 179 ++++-----------------
.../dubbo/remoting/api/ConnectionHandler.java | 67 +++-----
.../api/SingleProtocolConnectionManager.java | 8 +-
.../dubbo/rpc/protocol/tri/ClientStream.java | 16 +-
.../dubbo/rpc/protocol/tri/TripleInvoker.java | 1 -
.../dubbo/rpc/protocol/tri/TripleProtocol.java | 13 +-
6 files changed, 83 insertions(+), 201 deletions(-)
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Connection.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Connection.java
index 85777cf..49ba1ae 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Connection.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Connection.java
@@ -17,7 +17,6 @@
package org.apache.dubbo.remoting.api;
import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
@@ -26,34 +25,30 @@ import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.RemotingException;
-import org.apache.dubbo.remoting.utils.UrlUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.AttributeKey;
import io.netty.util.HashedWheelTimer;
+import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultPromise;
-import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
-import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CLIENT_THREADPOOL;
import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
import static org.apache.dubbo.remoting.api.NettyEventLoopFactory.socketChannelClass;
@@ -65,27 +60,23 @@ public class Connection extends AbstractReferenceCounted implements ReferenceCou
public static final AttributeKey<Connection> CONNECTION = AttributeKey.valueOf("connection");
private static final Logger logger = LoggerFactory.getLogger(Connection.class);
private final URL url;
- private final Bootstrap bootstrap;
private final int connectTimeout;
private final WireProtocol protocol;
private final Promise<Void> closeFuture;
private final InetSocketAddress remote;
- private final AtomicReference<ConnectionStatus> status;
- private volatile Channel channel;
- private volatile Future<Channel> connectFuture;
- private int retryAttempts = 1;
- private long lastReconnectTime;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final AtomicReference<Channel> channel = new AtomicReference<>();
public Connection(URL url) {
url = ExecutorUtil.setThreadName(url, "DubboClientHandler");
url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
this.url = url;
- this.status = new AtomicReference<>(ConnectionStatus.DISCONNECTED);
this.protocol = ExtensionLoader.getExtensionLoader(WireProtocol.class).getExtension(url.getProtocol());
this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
this.closeFuture = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
this.remote = getConnectAddress();
- this.bootstrap = open();
+ final Bootstrap bootstrap = open();
+ bootstrap.connect();
}
public static Connection getConnectionFromChannel(Channel channel) {
@@ -97,16 +88,13 @@ public class Connection extends AbstractReferenceCounted implements ReferenceCou
}
- public void init() throws RemotingException {
- concurrentConnect();
- }
-
public Bootstrap open() {
final Bootstrap bootstrap = new Bootstrap();
bootstrap.group(NettyEventLoopFactory.NIO_EVENT_LOOP_GROUP)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .remoteAddress(getConnectAddress())
.channel(socketChannelClass());
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, connectTimeout));
@@ -116,11 +104,11 @@ public class Connection extends AbstractReferenceCounted implements ReferenceCou
@Override
protected void initChannel(SocketChannel ch) {
ch.attr(CONNECTION).set(Connection.this);
- Connection.this.channel = ch;
- int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
// TODO support SSL
final ChannelPipeline p = ch.pipeline();//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
- p.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS));
+ // TODO support IDLE
+// int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
+// p.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS));
p.addLast(connectionHandler);
// TODO support ssl
protocol.configClientPipeline(p, null);
@@ -131,66 +119,43 @@ public class Connection extends AbstractReferenceCounted implements ReferenceCou
}
public Channel getChannel() {
- return channel;
+ return channel.get();
}
@Override
public String toString() {
- return getRemote() + ",channel=" + channel + ", status=" + status;
- }
-
- public void onIdle() {
- if (logger.isDebugEnabled()) {
- logger.debug(String.format("Connection:%s disconnected cause idle", this));
- }
- setStatus(ConnectionStatus.DISCONNECTED);
- this.channel = null;
+ return "(Ref=" + ReferenceCountUtil.refCnt(this) + ",local="+(getChannel()==null?null:getChannel().localAddress())+",remote="+getRemote();
}
- public boolean onDisConnected() {
- if (logger.isDebugEnabled()) {
- logger.debug(String.format("Connection:%s disconnected ", this));
+ public void onGoaway(Channel channel) {
+ if (this.channel.compareAndSet(channel, null)) {
+ if (logger.isInfoEnabled()) {
+ logger.info(String.format("Connection:%s goaway", this));
+ }
}
- return setStatus(ConnectionStatus.DISCONNECTED);
}
public void onConnected(Channel channel) {
- setStatus(ConnectionStatus.CONNECTED);
- this.channel = channel;
+ this.channel.set(channel);
channel.attr(CONNECTION).set(this);
- if (logger.isDebugEnabled()) {
- logger.debug(String.format("Connection:%s connected ", this));
+ if (logger.isInfoEnabled()) {
+ logger.info(String.format("Connection:%s connected ", this));
}
}
public boolean isAvailable() {
- return ConnectionStatus.CONNECTED == getStatus() && channel != null && channel.isActive();
- }
-
- public ConnectionStatus getStatus() {
- return status.get();
- }
-
- public boolean setStatus(ConnectionStatus status) {
- return this.status.getAndSet(status) != status;
+ final Channel channel = getChannel();
+ return channel != null && channel.isActive();
}
public boolean isClosed() {
- return getStatus() == ConnectionStatus.CLOSED;
- }
-
- public void close() {
- setStatus(ConnectionStatus.CLOSED);
- if (channel != null) {
- channel.close();
- }
+ return closed.get();
}
public ChannelFuture write(Object request) throws RemotingException {
- if (channel == null || !channel.isActive()) {
+ if (!isAvailable()) {
throw new RemotingException(null, null, "Failed to send request " + request + ", cause: The channel to " + remote + " is closed!");
}
-
return getChannel().writeAndFlush(request);
}
@@ -200,13 +165,20 @@ public class Connection extends AbstractReferenceCounted implements ReferenceCou
@Override
protected void deallocate() {
- setStatus(ConnectionStatus.CLOSED);
- if (channel != null) {
- channel.close();
+ if (closed.compareAndSet(false, true)) {
+ close();
}
closeFuture.setSuccess(null);
}
+ public void close() {
+ final Channel current = this.channel.get();
+ if (current != null) {
+ current.close();
+ }
+ this.channel.set(null);
+ }
+
@Override
public ReferenceCounted touch(Object hint) {
return this;
@@ -216,80 +188,6 @@ public class Connection extends AbstractReferenceCounted implements ReferenceCou
return new InetSocketAddress(NetUtils.filterLocalHost(getUrl().getHost()), getUrl().getPort());
}
- protected Future<Channel> connectAsync() {
- final Promise<Channel> promise = ImmediateEventExecutor.INSTANCE.newPromise();
- ChannelFuture future = bootstrap.connect(getRemote());
- final ChannelFutureListener listener = new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) {
- if (bootstrap.config().group().isShuttingDown()) {
- promise.tryFailure(new IllegalStateException("Client is shutdown"));
- return;
- }
- if (future.isSuccess()) {
- if (logger.isInfoEnabled()) {
- logger.info("Succeed connect to server " + future.channel().remoteAddress() + " from " + getClass().getSimpleName() + " "
- + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
- + ", channel is " + future.channel());
- }
- bootstrap.config().group().execute(() -> promise.setSuccess(future.channel()));
- } else {
- bootstrap.config().group().execute(() -> {
- final RemotingException cause = new RemotingException(null, getRemote(), "client(url: " + getUrl() + ") failed to connect to server "
- + future.channel().remoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
- promise.tryFailure(cause);
- });
- }
- Connection.this.connectFuture = null;
- }
- };
- future.addListener(listener);
- return promise;
- }
-
-
- private void concurrentConnect() throws RemotingException {
- if (channel != null && channel.isActive()) {
- return;
- }
- synchronized (this) {
- if (channel != null && channel.isActive()) {
- return;
- }
- if (connectFuture != null) {
- connectFuture.awaitUninterruptibly(getConnectTimeout());
- } else {
- int backoff = 2 << retryAttempts;
- if (System.currentTimeMillis() - lastReconnectTime < backoff) {
- return;
- }
- connectWithGuard();
- }
- }
- }
-
- protected synchronized void connectWithGuard() throws RemotingException {
- this.lastReconnectTime = System.currentTimeMillis();
- this.retryAttempts = Math.min(12, retryAttempts + 1);
- long start = System.currentTimeMillis();
- final Future<Channel> connectFuture = connectAsync();
- this.connectFuture = connectFuture;
- connectFuture.awaitUninterruptibly(getConnectTimeout());
- if (!connectFuture.isSuccess()) {
- if (connectFuture.isDone()) {
- throw new RemotingException(null, getRemote(), "client(url: " + getUrl() + ") failed to connect to server .error message is:" + connectFuture.cause().getMessage(),
- connectFuture.cause());
- } else {
- throw new RemotingException(null, getRemote(), "client(url: " + getUrl() + ") failed to connect to server. client-side timeout "
- + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
- + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
- }
- } else {
- this.retryAttempts = 0;
- }
- }
-
-
/**
* get url.
*
@@ -302,14 +200,5 @@ public class Connection extends AbstractReferenceCounted implements ReferenceCou
private int getConnectTimeout() {
return connectTimeout;
}
-
- public enum ConnectionStatus {
- DISCONNECTED,
- CONNECTING,
- CONNECTED,
- READ_ONLY,
- CLOSED
- }
-
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ConnectionHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ConnectionHandler.java
index de5c151..23b36d7 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ConnectionHandler.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ConnectionHandler.java
@@ -25,11 +25,9 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;
import io.netty.util.Timer;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@ChannelHandler.Sharable
@@ -37,11 +35,10 @@ public class ConnectionHandler extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(ConnectionHandler.class);
private static final int MIN_FAST_RECONNECT_INTERVAL = 4000;
- private static final int BACKOFF_CAP = 12;
+ private static final int BACKOFF_CAP = 15;
private static final AttributeKey<Boolean> GO_AWAY_KEY = AttributeKey.valueOf("dubbo_channel_goaway");
private final Timer timer;
private final Bootstrap bootstrap;
- private final Semaphore permit = new Semaphore(1);
private volatile long lastReconnect;
public ConnectionHandler(Bootstrap bootstrap, Timer timer) {
@@ -53,8 +50,9 @@ public class ConnectionHandler extends ChannelInboundHandlerAdapter {
channel.attr(GO_AWAY_KEY).set(true);
final Connection connection = Connection.getConnectionFromChannel(channel);
if (connection != null) {
- connection.onIdle();
+ connection.onGoaway(channel);
}
+ tryReconnect(connection);
}
@Override
@@ -78,18 +76,6 @@ public class ConnectionHandler extends ChannelInboundHandlerAdapter {
}
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- super.userEventTriggered(ctx, evt);
- if (evt instanceof IdleStateEvent) {
- final Connection connection = Connection.getConnectionFromChannel(ctx.channel());
- if (connection != null) {
- connection.onIdle();
- ctx.close();
- }
- }
- }
-
private boolean isGoAway(Channel channel) {
return Boolean.TRUE.equals(channel.attr(GO_AWAY_KEY).get());
}
@@ -98,25 +84,28 @@ public class ConnectionHandler extends ChannelInboundHandlerAdapter {
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// Reconnect event will be triggered by Connection.init();
if (isGoAway(ctx.channel())) {
+ ctx.fireChannelInactive();
return;
}
Connection connection = Connection.getConnectionFromChannel(ctx.channel());
- if (connection != null) {
- if (!connection.isClosed() && connection.onDisConnected()) {
- if (shouldFastReconnect()) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Connection %s inactive, schedule fast reconnect", connection));
- }
- reconnect(connection, 1);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Connection %s inactive, schedule normal reconnect", connection));
- }
- reconnect(connection, BACKOFF_CAP);
+ tryReconnect(connection);
+ ctx.fireChannelInactive();
+ }
+
+ private void tryReconnect(Connection connection) {
+ if (connection != null && !connection.isClosed()) {
+ if (shouldFastReconnect()) {
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Connection %s inactive, schedule fast reconnect", connection));
}
+ reconnect(connection, 1);
+ } else {
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Connection %s inactive, schedule normal reconnect", connection));
+ }
+ reconnect(connection, BACKOFF_CAP);
}
}
- ctx.fireChannelInactive();
}
private void reconnect(final Connection connection, final int attempts) {
@@ -127,20 +116,18 @@ public class ConnectionHandler extends ChannelInboundHandlerAdapter {
return;
}
- if (permit.tryAcquire()) {
- timer.newTimeout(timeout1 -> tryReconnect(connection, Math.min(BACKOFF_CAP, attempts + 1)), timeout, TimeUnit.MILLISECONDS);
- }
+ int nextAttempt = Math.min(BACKOFF_CAP, attempts + 1);
+ timer.newTimeout(timeout1 -> tryReconnect(connection, nextAttempt), timeout, TimeUnit.MILLISECONDS);
}
private void tryReconnect(final Connection connection, final int nextAttempt) {
- permit.release();
if (connection.isClosed() || bootstrap.config().group().isShuttingDown()) {
return;
}
- if (log.isDebugEnabled()) {
- log.debug(String.format("Connection %s is reconnecting, attempt=%d", connection, nextAttempt));
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Connection %s is reconnecting, attempt=%d", connection, nextAttempt));
}
bootstrap.connect(connection.getRemote()).addListener((ChannelFutureListener) future -> {
@@ -157,16 +144,14 @@ public class ConnectionHandler extends ChannelInboundHandlerAdapter {
if (future.isSuccess()) {
final Channel channel = future.channel();
- connection.onConnected(channel);
if (!connection.isClosed()) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("%s connected to %s", connection, connection.getRemote()));
- }
+ connection.onConnected(channel);
} else {
channel.close();
}
+ } else {
+ reconnect(connection, nextAttempt);
}
- reconnect(connection, nextAttempt);
});
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/SingleProtocolConnectionManager.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/SingleProtocolConnectionManager.java
index a4f8e07..e662e65 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/SingleProtocolConnectionManager.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/SingleProtocolConnectionManager.java
@@ -24,8 +24,6 @@ import io.netty.util.internal.PlatformDependent;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
-import static org.apache.dubbo.common.constants.CommonConstants.LAZY_CONNECT_KEY;
-
public class SingleProtocolConnectionManager implements ConnectionManager {
private final ConcurrentMap<String, Connection> connections = PlatformDependent.newConcurrentHashMap();
@@ -34,7 +32,7 @@ public class SingleProtocolConnectionManager implements ConnectionManager {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
- final Connection connection = connections.compute(url.getAddress(), (address, conn) -> {
+ return connections.compute(url.getAddress(), (address, conn) -> {
if (conn == null) {
final Connection created = new Connection(url);
created.getCloseFuture().addListener(future -> connections.remove(address, created));
@@ -44,10 +42,6 @@ public class SingleProtocolConnectionManager implements ConnectionManager {
return conn;
}
});
- if (!url.getParameter(LAZY_CONNECT_KEY, false)) {
- connection.init();
- }
- return connection;
}
@Override
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
index 1e72180..69ced1c 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
@@ -18,8 +18,6 @@ package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.logger.Logger;
-import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.api.Connection;
import org.apache.dubbo.remoting.exchange.Request;
@@ -48,6 +46,7 @@ import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2NoMoreStreamIdsException;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
+import io.netty.util.AsciiString;
import java.io.IOException;
import java.io.InputStream;
@@ -57,18 +56,19 @@ import static org.apache.dubbo.rpc.Constants.CONSUMER_MODEL;
import static org.apache.dubbo.rpc.protocol.tri.TripleUtil.responseErr;
public class ClientStream extends AbstractStream implements Stream {
- private static final Logger LOGGER = LoggerFactory.getLogger(ClientStream.class);
private static final GrpcStatus MISSING_RESP = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
.withDescription("Missing Response");
+ private static final AsciiString SCHEME = AsciiString.of("http");
+ private final String authority;
private final Request request;
private final RpcInvocation invocation;
-
public ClientStream(URL url, ChannelHandlerContext ctx, boolean needWrap, Request request) {
super(url, ctx, needWrap);
if (needWrap) {
setSerializeType((String) ((RpcInvocation) (request.getData())).getObjectAttachment(Constants.SERIALIZATION_KEY));
}
+ this.authority = url.getAddress();
this.request = request;
this.invocation = (RpcInvocation) request.getData();
}
@@ -97,7 +97,12 @@ public class ClientStream extends AbstractStream implements Stream {
@Override
public void write(Object obj, ChannelPromise promise) throws IOException {
+ final Http2StreamChannelBootstrap streamChannelBootstrap = new Http2StreamChannelBootstrap(getCtx().channel());
+ final Http2StreamChannel streamChannel = streamChannelBootstrap.open().syncUninterruptibly().getNow();
+
Http2Headers headers = new DefaultHttp2Headers()
+ .authority(authority)
+ .scheme(SCHEME)
.method(HttpMethod.POST.asciiName())
.path("/" + invocation.getObjectAttachment(CommonConstants.PATH_KEY) + "/" + invocation.getMethodName())
.set(HttpHeaderNames.CONTENT_TYPE, TripleConstant.CONTENT_PROTO)
@@ -127,8 +132,7 @@ public class ClientStream extends AbstractStream implements Stream {
DefaultHttp2HeadersFrame frame = new DefaultHttp2HeadersFrame(headers);
final TripleHttp2ClientResponseHandler responseHandler = new TripleHttp2ClientResponseHandler();
- final Http2StreamChannelBootstrap streamChannelBootstrap = new Http2StreamChannelBootstrap(getCtx().channel());
- final Http2StreamChannel streamChannel = streamChannelBootstrap.open().syncUninterruptibly().getNow();
+
TripleUtil.setClientStream(streamChannel, this);
streamChannel.pipeline().addLast(responseHandler)
.addLast(new GrpcDataDecoder(Integer.MAX_VALUE))
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index c874455..037a74d 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -92,7 +92,6 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(inv);
- this.connection.init();
DefaultFuture2 future = DefaultFuture2.newFuture(this.connection, req, timeout, executor);
final CompletableFuture<AppResponse> respFuture = future.thenApply(obj -> (AppResponse) obj);
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
index 4d6f16c..a9cb8ea 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
@@ -20,8 +20,11 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
+import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.PortUnificationExchanger;
+import org.apache.dubbo.remoting.transport.AbstractClient;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Protocol;
@@ -31,6 +34,9 @@ import org.apache.dubbo.rpc.protocol.AbstractProtocol;
import java.util.ArrayList;
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CLIENT_THREADPOOL;
+import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
+
/**
*
*/
@@ -38,6 +44,8 @@ public class TripleProtocol extends AbstractProtocol implements Protocol {
private static final Logger logger = LoggerFactory.getLogger(TripleProtocol.class);
private final PathResolver pathResolver = ExtensionLoader.getExtensionLoader(PathResolver.class).getDefaultExtension();
+ private final ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
+
@Override
public int getDefaultPort() {
@@ -72,7 +80,10 @@ public class TripleProtocol extends AbstractProtocol implements Protocol {
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
TripleInvoker<T> invoker;
try {
- invoker = new TripleInvoker<T>(type, url, invokers);
+ url = ExecutorUtil.setThreadName(url,"DubboClientHandler");
+ url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
+ executorRepository.createExecutorIfAbsent(url);
+ invoker = new TripleInvoker<>(type, url, invokers);
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}