You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2021/02/02 07:08:51 UTC

[dubbo] branch 3.0 updated: Add :authority and :scheme to triple headers (#7156)

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new c132438  Add :authority and :scheme to triple headers (#7156)
c132438 is described below

commit c132438c01dba18d6d0479b591b29753bbe4d204
Author: GuoHao <gu...@gmail.com>
AuthorDate: Tue Feb 2 15:08:17 2021 +0800

    Add :authority and :scheme to triple headers (#7156)
---
 .../org/apache/dubbo/remoting/api/Connection.java  | 179 ++++-----------------
 .../dubbo/remoting/api/ConnectionHandler.java      |  67 +++-----
 .../api/SingleProtocolConnectionManager.java       |   8 +-
 .../dubbo/rpc/protocol/tri/ClientStream.java       |  16 +-
 .../dubbo/rpc/protocol/tri/TripleInvoker.java      |   1 -
 .../dubbo/rpc/protocol/tri/TripleProtocol.java     |  13 +-
 6 files changed, 83 insertions(+), 201 deletions(-)

diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Connection.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Connection.java
index 85777cf..49ba1ae 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Connection.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Connection.java
@@ -17,7 +17,6 @@
 package org.apache.dubbo.remoting.api;
 
 import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.Version;
 import org.apache.dubbo.common.extension.ExtensionLoader;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
@@ -26,34 +25,30 @@ import org.apache.dubbo.common.utils.NamedThreadFactory;
 import org.apache.dubbo.common.utils.NetUtils;
 import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.remoting.RemotingException;
-import org.apache.dubbo.remoting.utils.UrlUtils;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.AbstractReferenceCounted;
 import io.netty.util.AttributeKey;
 import io.netty.util.HashedWheelTimer;
+import io.netty.util.ReferenceCountUtil;
 import io.netty.util.ReferenceCounted;
 import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultPromise;
-import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GlobalEventExecutor;
-import io.netty.util.concurrent.ImmediateEventExecutor;
 import io.netty.util.concurrent.Promise;
 
 import java.net.InetSocketAddress;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CLIENT_THREADPOOL;
 import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
 import static org.apache.dubbo.remoting.api.NettyEventLoopFactory.socketChannelClass;
@@ -65,27 +60,23 @@ public class Connection extends AbstractReferenceCounted implements ReferenceCou
     public static final AttributeKey<Connection> CONNECTION = AttributeKey.valueOf("connection");
     private static final Logger logger = LoggerFactory.getLogger(Connection.class);
     private final URL url;
-    private final Bootstrap bootstrap;
     private final int connectTimeout;
     private final WireProtocol protocol;
     private final Promise<Void> closeFuture;
     private final InetSocketAddress remote;
-    private final AtomicReference<ConnectionStatus> status;
-    private volatile Channel channel;
-    private volatile Future<Channel> connectFuture;
-    private int retryAttempts = 1;
-    private long lastReconnectTime;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final AtomicReference<Channel> channel = new AtomicReference<>();
 
     public Connection(URL url) {
         url = ExecutorUtil.setThreadName(url, "DubboClientHandler");
         url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
         this.url = url;
-        this.status = new AtomicReference<>(ConnectionStatus.DISCONNECTED);
         this.protocol = ExtensionLoader.getExtensionLoader(WireProtocol.class).getExtension(url.getProtocol());
         this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
         this.closeFuture = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
         this.remote = getConnectAddress();
-        this.bootstrap = open();
+        final Bootstrap bootstrap = open();
+        bootstrap.connect();
     }
 
     public static Connection getConnectionFromChannel(Channel channel) {
@@ -97,16 +88,13 @@ public class Connection extends AbstractReferenceCounted implements ReferenceCou
     }
 
 
-    public void init() throws RemotingException {
-        concurrentConnect();
-    }
-
     public Bootstrap open() {
         final Bootstrap bootstrap = new Bootstrap();
         bootstrap.group(NettyEventLoopFactory.NIO_EVENT_LOOP_GROUP)
                 .option(ChannelOption.SO_KEEPALIVE, true)
                 .option(ChannelOption.TCP_NODELAY, true)
                 .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+                .remoteAddress(getConnectAddress())
                 .channel(socketChannelClass());
 
         bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, connectTimeout));
@@ -116,11 +104,11 @@ public class Connection extends AbstractReferenceCounted implements ReferenceCou
             @Override
             protected void initChannel(SocketChannel ch) {
                 ch.attr(CONNECTION).set(Connection.this);
-                Connection.this.channel = ch;
-                int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
                 // TODO support SSL
                 final ChannelPipeline p = ch.pipeline();//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
-                p.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS));
+                // TODO support IDLE
+//                int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
+//                p.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS));
                 p.addLast(connectionHandler);
                 // TODO support ssl
                 protocol.configClientPipeline(p, null);
@@ -131,66 +119,43 @@ public class Connection extends AbstractReferenceCounted implements ReferenceCou
     }
 
     public Channel getChannel() {
-        return channel;
+        return channel.get();
     }
 
     @Override
     public String toString() {
-        return getRemote() + ",channel=" + channel + ", status=" + status;
-    }
-
-    public void onIdle() {
-        if (logger.isDebugEnabled()) {
-            logger.debug(String.format("Connection:%s disconnected cause idle", this));
-        }
-        setStatus(ConnectionStatus.DISCONNECTED);
-        this.channel = null;
+        return "(Ref=" + ReferenceCountUtil.refCnt(this) + ",local="+(getChannel()==null?null:getChannel().localAddress())+",remote="+getRemote();
     }
 
-    public boolean onDisConnected() {
-        if (logger.isDebugEnabled()) {
-            logger.debug(String.format("Connection:%s disconnected ", this));
+    public void onGoaway(Channel channel) {
+        if (this.channel.compareAndSet(channel, null)) {
+            if (logger.isInfoEnabled()) {
+                logger.info(String.format("Connection:%s  goaway", this));
+            }
         }
-        return setStatus(ConnectionStatus.DISCONNECTED);
     }
 
     public void onConnected(Channel channel) {
-        setStatus(ConnectionStatus.CONNECTED);
-        this.channel = channel;
+        this.channel.set(channel);
         channel.attr(CONNECTION).set(this);
-        if (logger.isDebugEnabled()) {
-            logger.debug(String.format("Connection:%s connected ", this));
+        if (logger.isInfoEnabled()) {
+            logger.info(String.format("Connection:%s connected ", this));
         }
     }
 
     public boolean isAvailable() {
-        return ConnectionStatus.CONNECTED == getStatus() && channel != null && channel.isActive();
-    }
-
-    public ConnectionStatus getStatus() {
-        return status.get();
-    }
-
-    public boolean setStatus(ConnectionStatus status) {
-        return this.status.getAndSet(status) != status;
+        final Channel channel = getChannel();
+        return channel != null && channel.isActive();
     }
 
     public boolean isClosed() {
-        return getStatus() == ConnectionStatus.CLOSED;
-    }
-
-    public void close() {
-        setStatus(ConnectionStatus.CLOSED);
-        if (channel != null) {
-            channel.close();
-        }
+        return closed.get();
     }
 
     public ChannelFuture write(Object request) throws RemotingException {
-        if (channel == null || !channel.isActive()) {
+        if (!isAvailable()) {
             throw new RemotingException(null, null, "Failed to send request " + request + ", cause: The channel to " + remote + " is closed!");
         }
-
         return getChannel().writeAndFlush(request);
     }
 
@@ -200,13 +165,20 @@ public class Connection extends AbstractReferenceCounted implements ReferenceCou
 
     @Override
     protected void deallocate() {
-        setStatus(ConnectionStatus.CLOSED);
-        if (channel != null) {
-            channel.close();
+        if (closed.compareAndSet(false, true)) {
+            close();
         }
         closeFuture.setSuccess(null);
     }
 
+    public void close() {
+        final Channel current = this.channel.get();
+        if (current != null) {
+            current.close();
+        }
+        this.channel.set(null);
+    }
+
     @Override
     public ReferenceCounted touch(Object hint) {
         return this;
@@ -216,80 +188,6 @@ public class Connection extends AbstractReferenceCounted implements ReferenceCou
         return new InetSocketAddress(NetUtils.filterLocalHost(getUrl().getHost()), getUrl().getPort());
     }
 
-    protected Future<Channel> connectAsync() {
-        final Promise<Channel> promise = ImmediateEventExecutor.INSTANCE.newPromise();
-        ChannelFuture future = bootstrap.connect(getRemote());
-        final ChannelFutureListener listener = new ChannelFutureListener() {
-            @Override
-            public void operationComplete(ChannelFuture future) {
-                if (bootstrap.config().group().isShuttingDown()) {
-                    promise.tryFailure(new IllegalStateException("Client is shutdown"));
-                    return;
-                }
-                if (future.isSuccess()) {
-                    if (logger.isInfoEnabled()) {
-                        logger.info("Succeed connect to server " + future.channel().remoteAddress() + " from " + getClass().getSimpleName() + " "
-                                + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
-                                + ", channel is " + future.channel());
-                    }
-                    bootstrap.config().group().execute(() -> promise.setSuccess(future.channel()));
-                } else {
-                    bootstrap.config().group().execute(() -> {
-                        final RemotingException cause = new RemotingException(null, getRemote(), "client(url: " + getUrl() + ") failed to connect to server "
-                                + future.channel().remoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
-                        promise.tryFailure(cause);
-                    });
-                }
-                Connection.this.connectFuture = null;
-            }
-        };
-        future.addListener(listener);
-        return promise;
-    }
-
-
-    private void concurrentConnect() throws RemotingException {
-        if (channel != null && channel.isActive()) {
-            return;
-        }
-        synchronized (this) {
-            if (channel != null && channel.isActive()) {
-                return;
-            }
-            if (connectFuture != null) {
-                connectFuture.awaitUninterruptibly(getConnectTimeout());
-            } else {
-                int backoff = 2 << retryAttempts;
-                if (System.currentTimeMillis() - lastReconnectTime < backoff) {
-                    return;
-                }
-                connectWithGuard();
-            }
-        }
-    }
-
-    protected synchronized void connectWithGuard() throws RemotingException {
-        this.lastReconnectTime = System.currentTimeMillis();
-        this.retryAttempts = Math.min(12, retryAttempts + 1);
-        long start = System.currentTimeMillis();
-        final Future<Channel> connectFuture = connectAsync();
-        this.connectFuture = connectFuture;
-        connectFuture.awaitUninterruptibly(getConnectTimeout());
-        if (!connectFuture.isSuccess()) {
-            if (connectFuture.isDone()) {
-                throw new RemotingException(null, getRemote(), "client(url: " + getUrl() + ") failed to connect to server .error message is:" + connectFuture.cause().getMessage(),
-                        connectFuture.cause());
-            } else {
-                throw new RemotingException(null, getRemote(), "client(url: " + getUrl() + ") failed to connect to server. client-side timeout "
-                        + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
-                        + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
-            }
-        } else {
-            this.retryAttempts = 0;
-        }
-    }
-
-
     /**
      * get url.
      *
@@ -302,14 +200,5 @@ public class Connection extends AbstractReferenceCounted implements ReferenceCou
     private int getConnectTimeout() {
         return connectTimeout;
     }
-
-    public enum ConnectionStatus {
-        DISCONNECTED,
-        CONNECTING,
-        CONNECTED,
-        READ_ONLY,
-        CLOSED
-    }
-
 }
 
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ConnectionHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ConnectionHandler.java
index de5c151..23b36d7 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ConnectionHandler.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ConnectionHandler.java
@@ -25,11 +25,9 @@ import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.util.AttributeKey;
 import io.netty.util.Timer;
 
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
 @ChannelHandler.Sharable
@@ -37,11 +35,10 @@ public class ConnectionHandler extends ChannelInboundHandlerAdapter {
     private static final Logger log = LoggerFactory.getLogger(ConnectionHandler.class);
 
     private static final int MIN_FAST_RECONNECT_INTERVAL = 4000;
-    private static final int BACKOFF_CAP = 12;
+    private static final int BACKOFF_CAP = 15;
     private static final AttributeKey<Boolean> GO_AWAY_KEY = AttributeKey.valueOf("dubbo_channel_goaway");
     private final Timer timer;
     private final Bootstrap bootstrap;
-    private final Semaphore permit = new Semaphore(1);
     private volatile long lastReconnect;
 
     public ConnectionHandler(Bootstrap bootstrap, Timer timer) {
@@ -53,8 +50,9 @@ public class ConnectionHandler extends ChannelInboundHandlerAdapter {
         channel.attr(GO_AWAY_KEY).set(true);
         final Connection connection = Connection.getConnectionFromChannel(channel);
         if (connection != null) {
-            connection.onIdle();
+            connection.onGoaway(channel);
         }
+        tryReconnect(connection);
     }
 
     @Override
@@ -78,18 +76,6 @@ public class ConnectionHandler extends ChannelInboundHandlerAdapter {
     }
 
 
-    @Override
-    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
-        super.userEventTriggered(ctx, evt);
-        if (evt instanceof IdleStateEvent) {
-            final Connection connection = Connection.getConnectionFromChannel(ctx.channel());
-            if (connection != null) {
-                connection.onIdle();
-                ctx.close();
-            }
-        }
-    }
-
     private boolean isGoAway(Channel channel) {
         return Boolean.TRUE.equals(channel.attr(GO_AWAY_KEY).get());
     }
@@ -98,25 +84,28 @@ public class ConnectionHandler extends ChannelInboundHandlerAdapter {
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
         // Reconnect event will be triggered by Connection.init();
         if (isGoAway(ctx.channel())) {
+            ctx.fireChannelInactive();
             return;
         }
         Connection connection = Connection.getConnectionFromChannel(ctx.channel());
-        if (connection != null) {
-            if (!connection.isClosed() && connection.onDisConnected()) {
-                if (shouldFastReconnect()) {
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Connection %s inactive, schedule fast reconnect", connection));
-                    }
-                    reconnect(connection, 1);
-                } else {
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Connection %s inactive, schedule normal reconnect", connection));
-                    }
-                    reconnect(connection, BACKOFF_CAP);
+        tryReconnect(connection);
+        ctx.fireChannelInactive();
+    }
+
+    private void tryReconnect(Connection connection) {
+        if (connection != null && !connection.isClosed()) {
+            if (shouldFastReconnect()) {
+                if (log.isInfoEnabled()) {
+                    log.info(String.format("Connection %s inactive, schedule fast reconnect", connection));
                 }
+                reconnect(connection, 1);
+            } else {
+                if (log.isInfoEnabled()) {
+                    log.info(String.format("Connection %s inactive, schedule normal reconnect", connection));
+                }
+                reconnect(connection, BACKOFF_CAP);
             }
         }
-        ctx.fireChannelInactive();
     }
 
     private void reconnect(final Connection connection, final int attempts) {
@@ -127,20 +116,18 @@ public class ConnectionHandler extends ChannelInboundHandlerAdapter {
             return;
         }
 
-        if (permit.tryAcquire()) {
-            timer.newTimeout(timeout1 -> tryReconnect(connection, Math.min(BACKOFF_CAP, attempts + 1)), timeout, TimeUnit.MILLISECONDS);
-        }
+        int nextAttempt = Math.min(BACKOFF_CAP, attempts + 1);
+        timer.newTimeout(timeout1 -> tryReconnect(connection, nextAttempt), timeout, TimeUnit.MILLISECONDS);
     }
 
 
     private void tryReconnect(final Connection connection, final int nextAttempt) {
-        permit.release();
 
         if (connection.isClosed() || bootstrap.config().group().isShuttingDown()) {
             return;
         }
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Connection %s is reconnecting, attempt=%d", connection, nextAttempt));
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Connection %s is reconnecting, attempt=%d", connection, nextAttempt));
         }
 
         bootstrap.connect(connection.getRemote()).addListener((ChannelFutureListener) future -> {
@@ -157,16 +144,14 @@ public class ConnectionHandler extends ChannelInboundHandlerAdapter {
 
             if (future.isSuccess()) {
                 final Channel channel = future.channel();
-                connection.onConnected(channel);
                 if (!connection.isClosed()) {
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("%s connected to %s", connection, connection.getRemote()));
-                    }
+                    connection.onConnected(channel);
                 } else {
                     channel.close();
                 }
+            } else {
+                reconnect(connection, nextAttempt);
             }
-            reconnect(connection, nextAttempt);
         });
     }
 
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/SingleProtocolConnectionManager.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/SingleProtocolConnectionManager.java
index a4f8e07..e662e65 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/SingleProtocolConnectionManager.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/SingleProtocolConnectionManager.java
@@ -24,8 +24,6 @@ import io.netty.util.internal.PlatformDependent;
 import java.util.concurrent.ConcurrentMap;
 import java.util.function.Consumer;
 
-import static org.apache.dubbo.common.constants.CommonConstants.LAZY_CONNECT_KEY;
-
 public class SingleProtocolConnectionManager implements ConnectionManager {
     private final ConcurrentMap<String, Connection> connections = PlatformDependent.newConcurrentHashMap();
 
@@ -34,7 +32,7 @@ public class SingleProtocolConnectionManager implements ConnectionManager {
         if (url == null) {
             throw new IllegalArgumentException("url == null");
         }
-        final Connection connection = connections.compute(url.getAddress(), (address, conn) -> {
+        return connections.compute(url.getAddress(), (address, conn) -> {
             if (conn == null) {
                 final Connection created = new Connection(url);
                 created.getCloseFuture().addListener(future -> connections.remove(address, created));
@@ -44,10 +42,6 @@ public class SingleProtocolConnectionManager implements ConnectionManager {
                 return conn;
             }
         });
-        if (!url.getParameter(LAZY_CONNECT_KEY, false)) {
-            connection.init();
-        }
-        return connection;
     }
 
     @Override
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
index 1e72180..69ced1c 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
@@ -18,8 +18,6 @@ package org.apache.dubbo.rpc.protocol.tri;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.logger.Logger;
-import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.remoting.api.Connection;
 import org.apache.dubbo.remoting.exchange.Request;
@@ -48,6 +46,7 @@ import io.netty.handler.codec.http2.Http2Headers;
 import io.netty.handler.codec.http2.Http2NoMoreStreamIdsException;
 import io.netty.handler.codec.http2.Http2StreamChannel;
 import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
+import io.netty.util.AsciiString;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -57,18 +56,19 @@ import static org.apache.dubbo.rpc.Constants.CONSUMER_MODEL;
 import static org.apache.dubbo.rpc.protocol.tri.TripleUtil.responseErr;
 
 public class ClientStream extends AbstractStream implements Stream {
-    private static final Logger LOGGER = LoggerFactory.getLogger(ClientStream.class);
     private static final GrpcStatus MISSING_RESP = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
             .withDescription("Missing Response");
+    private static final AsciiString SCHEME = AsciiString.of("http");
+    private final String authority;
     private final Request request;
     private final RpcInvocation invocation;
 
-
     public ClientStream(URL url, ChannelHandlerContext ctx, boolean needWrap, Request request) {
         super(url, ctx, needWrap);
         if (needWrap) {
             setSerializeType((String) ((RpcInvocation) (request.getData())).getObjectAttachment(Constants.SERIALIZATION_KEY));
         }
+        this.authority = url.getAddress();
         this.request = request;
         this.invocation = (RpcInvocation) request.getData();
     }
@@ -97,7 +97,12 @@ public class ClientStream extends AbstractStream implements Stream {
 
     @Override
     public void write(Object obj, ChannelPromise promise) throws IOException {
+        final Http2StreamChannelBootstrap streamChannelBootstrap = new Http2StreamChannelBootstrap(getCtx().channel());
+        final Http2StreamChannel streamChannel = streamChannelBootstrap.open().syncUninterruptibly().getNow();
+
         Http2Headers headers = new DefaultHttp2Headers()
+                .authority(authority)
+                .scheme(SCHEME)
                 .method(HttpMethod.POST.asciiName())
                 .path("/" + invocation.getObjectAttachment(CommonConstants.PATH_KEY) + "/" + invocation.getMethodName())
                 .set(HttpHeaderNames.CONTENT_TYPE, TripleConstant.CONTENT_PROTO)
@@ -127,8 +132,7 @@ public class ClientStream extends AbstractStream implements Stream {
         DefaultHttp2HeadersFrame frame = new DefaultHttp2HeadersFrame(headers);
         final TripleHttp2ClientResponseHandler responseHandler = new TripleHttp2ClientResponseHandler();
 
-        final Http2StreamChannelBootstrap streamChannelBootstrap = new Http2StreamChannelBootstrap(getCtx().channel());
-        final Http2StreamChannel streamChannel = streamChannelBootstrap.open().syncUninterruptibly().getNow();
+
         TripleUtil.setClientStream(streamChannel, this);
         streamChannel.pipeline().addLast(responseHandler)
                 .addLast(new GrpcDataDecoder(Integer.MAX_VALUE))
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index c874455..037a74d 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -92,7 +92,6 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
             req.setVersion(Version.getProtocolVersion());
             req.setTwoWay(true);
             req.setData(inv);
-            this.connection.init();
 
             DefaultFuture2 future = DefaultFuture2.newFuture(this.connection, req, timeout, executor);
             final CompletableFuture<AppResponse> respFuture = future.thenApply(obj -> (AppResponse) obj);
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
index 4d6f16c..a9cb8ea 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
@@ -20,8 +20,11 @@ import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.extension.ExtensionLoader;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
+import org.apache.dubbo.common.utils.ExecutorUtil;
 import org.apache.dubbo.remoting.RemotingException;
 import org.apache.dubbo.remoting.exchange.PortUnificationExchanger;
+import org.apache.dubbo.remoting.transport.AbstractClient;
 import org.apache.dubbo.rpc.Exporter;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Protocol;
@@ -31,6 +34,9 @@ import org.apache.dubbo.rpc.protocol.AbstractProtocol;
 
 import java.util.ArrayList;
 
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CLIENT_THREADPOOL;
+import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
+
 /**
  *
  */
@@ -38,6 +44,8 @@ public class TripleProtocol extends AbstractProtocol implements Protocol {
 
     private static final Logger logger = LoggerFactory.getLogger(TripleProtocol.class);
     private final PathResolver pathResolver = ExtensionLoader.getExtensionLoader(PathResolver.class).getDefaultExtension();
+    private final ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
+
 
     @Override
     public int getDefaultPort() {
@@ -72,7 +80,10 @@ public class TripleProtocol extends AbstractProtocol implements Protocol {
     public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
         TripleInvoker<T> invoker;
         try {
-            invoker = new TripleInvoker<T>(type, url, invokers);
+            url = ExecutorUtil.setThreadName(url,"DubboClientHandler");
+            url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
+            executorRepository.createExecutorIfAbsent(url);
+            invoker = new TripleInvoker<>(type, url, invokers);
         } catch (RemotingException e) {
             throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
         }