You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/03/24 13:42:54 UTC

[pulsar] 02/02: [refactor][proxy] Refactor Proxy code and fix connection stalling by switching to auto read mode (#14713)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 766e5fe90ea331490771a032c31e96d78640c425
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue Mar 22 05:58:10 2022 +0200

    [refactor][proxy] Refactor Proxy code and fix connection stalling by switching to auto read mode (#14713)
    
    ### Motivation
    
    Refactor Proxy code to make it easier to understand and maintain. In addition, switch to use auto read mode since the proxies connections seem to stall in some cases since the proxied connection doesn't use Netty's auto read mode and the read handling doesn't seem complete.
    
    Currently, the proxy calls `.read()` when a message is written to the connection. There might be more messages flowing in the other direction and it could result in a blocked connection with the current solution that doesn't use Netty's auto read mode.
    
    Currently auto read is disabled in DirectProxyHandler for the connection between the proxy and the broker:
    https://github.com/apache/pulsar/blob/a26905371749798ec5288fb07a69978a36aacfaa/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java#L112
    
    ### Modifications
    
    - replace broker host parsing with a simple solution
    - pass remote host name to ProxyBackendHandler in the constructor
    - rename "targetBrokerUrl" to "brokerHostAndPort" since the "targetBrokerUrl" is really "hostname:port" string
    - move HA proxy message handling to ProxyBackendHandle and extract the logic to a method
    - remove the static "inboundOutboundChannelMap" which was used for log level 2
      - make it obsolete by passing the peer channel id to ParserProxyHandler
     - Enable auto read in proxy and remove `ctx.read()` / `channel.read()` calls
    - prepare for IPv6 support (reported as #14732) by improving the `host:port` parsing (pick last `:` since IPv6 address might contains multiple `:` characters)
    - Handle backpressure properly by switching auto read off when channel writability changes
      - change auto read of the proxy-broker connection based on the writability of the client-proxy connection
      - change auto read of the client-proxy connection based on the writability of the proxy-broker connection
    - Consistently handle write errors by delegating exception handling to exceptionCaught method by using `.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE)`
    
    (cherry picked from commit a1037c75c9c305369799b71d840fa73cb198b293)
---
 .../pulsar/proxy/server/BrokerProxyValidator.java  |   2 +-
 .../pulsar/proxy/server/DirectProxyHandler.java    | 161 +++++++++------------
 .../pulsar/proxy/server/ParserProxyHandler.java    |   8 +-
 .../pulsar/proxy/server/ProxyConnection.java       |  72 ++++-----
 .../proxy/server/BrokerProxyValidatorTest.java     |  20 +++
 5 files changed, 126 insertions(+), 137 deletions(-)

diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java
index debe1f7..b0529c2 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java
@@ -113,7 +113,7 @@ public class BrokerProxyValidator {
     }
 
     public CompletableFuture<InetSocketAddress> resolveAndCheckTargetAddress(String hostAndPort) {
-        int pos = hostAndPort.indexOf(':');
+        int pos = hostAndPort.lastIndexOf(':');
         String host = hostAndPort.substring(0, pos);
         int port = Integer.parseInt(hostAndPort.substring(pos + 1));
         if (!isPortAllowed(port)) {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index edf4f18..0cbceb1 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -26,9 +26,9 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelId;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.socket.SocketChannel;
@@ -40,14 +40,8 @@ import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.timeout.ReadTimeoutHandler;
 import io.netty.util.CharsetUtil;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
 import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.Arrays;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import javax.net.ssl.SSLSession;
@@ -71,11 +65,11 @@ public class DirectProxyHandler {
 
     @Getter
     private final Channel inboundChannel;
+    private final ProxyConnection proxyConnection;
     @Getter
     Channel outboundChannel;
     @Getter
     private final Rate inboundChannelRequestsRate;
-    protected static Map<ChannelId, ChannelId> inboundOutboundChannelMap = new ConcurrentHashMap<>();
     private final String originalPrincipal;
     private final AuthData clientAuthData;
     private final String clientAuthMethod;
@@ -86,12 +80,13 @@ public class DirectProxyHandler {
     private final ProxyService service;
     private final Runnable onHandshakeCompleteAction;
 
-    public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl,
+    public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String brokerHostAndPort,
                               InetSocketAddress targetBrokerAddress, int protocolVersion,
                               Supplier<SslHandler> sslHandlerSupplier) {
         this.service = service;
         this.authentication = proxyConnection.getClientAuthentication();
         this.inboundChannel = proxyConnection.ctx().channel();
+        this.proxyConnection = proxyConnection;
         this.inboundChannelRequestsRate = new Rate();
         this.originalPrincipal = proxyConnection.clientAuthRole;
         this.clientAuthData = proxyConnection.clientAuthData;
@@ -109,7 +104,18 @@ public class DirectProxyHandler {
         if (brokerProxyConnectTimeoutMs > 0) {
             b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, brokerProxyConnectTimeoutMs);
         }
-        b.group(inboundChannel.eventLoop()).channel(inboundChannel.getClass()).option(ChannelOption.AUTO_READ, false);
+        b.group(inboundChannel.eventLoop())
+                .channel(inboundChannel.getClass());
+
+        String remoteHost;
+        try {
+            remoteHost = parseHost(brokerHostAndPort);
+        } catch (IllegalArgumentException e) {
+            log.warn("[{}] Failed to parse broker host '{}'", inboundChannel, brokerHostAndPort, e);
+            inboundChannel.close();
+            return;
+        }
+
         b.handler(new ChannelInitializer<SocketChannel>() {
             @Override
             protected void initChannel(SocketChannel ch) {
@@ -123,55 +129,40 @@ public class DirectProxyHandler {
                 }
                 ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
                     Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
-                ch.pipeline().addLast("proxyOutboundHandler", new ProxyBackendHandler(config, protocolVersion));
+                ch.pipeline().addLast("proxyOutboundHandler",
+                        new ProxyBackendHandler(config, protocolVersion, remoteHost));
             }
         });
 
-        URI targetBroker;
-        try {
-            // targetBrokerUrl is coming in the "hostname:6650" form, so we need
-            // to extract host and port
-            targetBroker = new URI("pulsar://" + targetBrokerUrl);
-        } catch (URISyntaxException e) {
-            log.warn("[{}] Failed to parse broker url '{}'", inboundChannel, targetBrokerUrl, e);
-            inboundChannel.close();
-            return;
-        }
-
         ChannelFuture f = b.connect(targetBrokerAddress);
         outboundChannel = f.channel();
         f.addListener(future -> {
             if (!future.isSuccess()) {
                 // Close the connection if the connection attempt has failed.
                 log.warn("[{}] Establishing connection to {} ({}) failed. Closing inbound channel.", inboundChannel,
-                        targetBrokerAddress, targetBrokerUrl, future.cause());
+                        targetBrokerAddress, brokerHostAndPort, future.cause());
                 inboundChannel.close();
                 return;
             }
-            final ProxyBackendHandler cnx = (ProxyBackendHandler) outboundChannel.pipeline()
-                    .get("proxyOutboundHandler");
-            cnx.setRemoteHostName(targetBroker.getHost());
-
-            // if enable full parsing feature
-            if (service.getProxyLogLevel() == 2) {
-                //Set a map between inbound and outbound,
-                //so can find inbound by outbound or find outbound by inbound
-                inboundOutboundChannelMap.put(outboundChannel.id(), inboundChannel.id());
-            }
+        });
+    }
 
-            if (!config.isHaProxyProtocolEnabled()) {
-                return;
-            }
+    private static String parseHost(String brokerPortAndHost) {
+        int pos = brokerPortAndHost.lastIndexOf(':');
+        if (pos > 0) {
+            return brokerPortAndHost.substring(0, pos);
+        } else {
+            throw new IllegalArgumentException("Illegal broker host:port '" + brokerPortAndHost + "'");
+        }
+    }
 
-            if (proxyConnection.hasHAProxyMessage()) {
-                outboundChannel.writeAndFlush(encodeProxyProtocolMessage(proxyConnection.getHAProxyMessage()));
-            } else {
-                if (!(inboundChannel.remoteAddress() instanceof InetSocketAddress)) {
-                    return;
-                }
-                if (!(outboundChannel.localAddress() instanceof InetSocketAddress)) {
-                    return;
-                }
+    private void writeHAProxyMessage() {
+        if (proxyConnection.hasHAProxyMessage()) {
+            outboundChannel.writeAndFlush(encodeProxyProtocolMessage(proxyConnection.getHAProxyMessage()))
+                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+        } else {
+            if (inboundChannel.remoteAddress() instanceof InetSocketAddress
+                    && outboundChannel.localAddress() instanceof InetSocketAddress) {
                 InetSocketAddress clientAddress = (InetSocketAddress) inboundChannel.remoteAddress();
                 String sourceAddress = clientAddress.getAddress().getHostAddress();
                 int sourcePort = clientAddress.getPort();
@@ -179,13 +170,17 @@ public class DirectProxyHandler {
                 String destinationAddress = proxyAddress.getAddress().getHostAddress();
                 int destinationPort = proxyAddress.getPort();
                 HAProxyMessage msg = new HAProxyMessage(HAProxyProtocolVersion.V1, HAProxyCommand.PROXY,
-                        HAProxyProxiedProtocol.TCP4, sourceAddress, destinationAddress, sourcePort, destinationPort);
-                outboundChannel.writeAndFlush(encodeProxyProtocolMessage(msg));
+                        HAProxyProxiedProtocol.TCP4, sourceAddress, destinationAddress, sourcePort,
+                        destinationPort);
+                outboundChannel.writeAndFlush(encodeProxyProtocolMessage(msg))
+                        .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
                 msg.release();
             }
-        });
+        }
     }
 
+
+
     private ByteBuf encodeProxyProtocolMessage(HAProxyMessage msg) {
         // Max length of v1 version proxy protocol message is 108
         ByteBuf out = Unpooled.buffer(108);
@@ -217,30 +212,45 @@ public class DirectProxyHandler {
         Init, HandshakeCompleted
     }
 
-    public class ProxyBackendHandler extends PulsarDecoder implements FutureListener<Void> {
+    public class ProxyBackendHandler extends PulsarDecoder {
 
         private BackendState state = BackendState.Init;
-        private String remoteHostName;
+        private final String remoteHostName;
         protected ChannelHandlerContext ctx;
         private final ProxyConfiguration config;
         private final int protocolVersion;
 
-        public ProxyBackendHandler(ProxyConfiguration config, int protocolVersion) {
+        public ProxyBackendHandler(ProxyConfiguration config, int protocolVersion, String remoteHostName) {
             this.config = config;
             this.protocolVersion = protocolVersion;
+            this.remoteHostName = remoteHostName;
         }
 
         @Override
         public void channelActive(ChannelHandlerContext ctx) throws Exception {
             this.ctx = ctx;
+
+            if (config.isHaProxyProtocolEnabled()) {
+                writeHAProxyMessage();
+            }
+
             // Send the Connect command to broker
             authenticationDataProvider = authentication.getAuthData(remoteHostName);
             AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
             ByteBuf command;
             command = Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion, "Pulsar proxy",
                     null /* target broker */, originalPrincipal, clientAuthData, clientAuthMethod);
-            outboundChannel.writeAndFlush(command);
-            outboundChannel.read();
+            outboundChannel.writeAndFlush(command)
+                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+        }
+
+        @Override
+        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+            // handle backpressure
+            // stop/resume reading input from connection between the client and the proxy
+            // when the writability of the connection between the proxy and the broker changes
+            inboundChannel.config().setAutoRead(ctx.channel().isWritable());
+            super.channelWritabilityChanged(ctx);
         }
 
         @Override
@@ -261,7 +271,8 @@ public class DirectProxyHandler {
                 if (msg instanceof ByteBuf) {
                     ProxyService.BYTES_COUNTER.inc(((ByteBuf) msg).readableBytes());
                 }
-                inboundChannel.writeAndFlush(msg).addListener(this);
+                inboundChannel.writeAndFlush(msg)
+                        .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
                 break;
 
             default:
@@ -300,27 +311,14 @@ public class DirectProxyHandler {
                     log.debug("{} Mutual auth {}", ctx.channel(), authentication.getAuthMethodName());
                 }
 
-                outboundChannel.writeAndFlush(request);
-                outboundChannel.read();
+                outboundChannel.writeAndFlush(request)
+                        .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
             } catch (Exception e) {
                 log.error("Error mutual verify", e);
             }
         }
 
         @Override
-        public void operationComplete(Future<Void> future) {
-            // This is invoked when the write operation on the paired connection
-            // is completed
-            if (future.isSuccess()) {
-                outboundChannel.read();
-            } else {
-                log.warn("[{}] [{}] Failed to write on proxy connection. Closing both connections.", inboundChannel,
-                        outboundChannel, future.cause());
-                inboundChannel.close();
-            }
-        }
-
-        @Override
         protected void messageReceived() {
             // no-op
         }
@@ -349,18 +347,7 @@ public class DirectProxyHandler {
             int maxMessageSize =
                     connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
             inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize))
-                    .addListener(future -> {
-                        if (future.isSuccess()) {
-                            // Start reading from both connections
-                            inboundChannel.read();
-                            outboundChannel.read();
-                        } else {
-                            log.warn("[{}] [{}] Failed to write to inbound connection. Closing both connections.",
-                                    inboundChannel,
-                                    outboundChannel, future.cause());
-                            inboundChannel.close();
-                        }
-                    });
+                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
         }
 
         private void startDirectProxying(CommandConnected connected) {
@@ -389,20 +376,20 @@ public class DirectProxyHandler {
                     inboundChannel.pipeline().addBefore("handler", "inboundParser",
                             new ParserProxyHandler(service, inboundChannel,
                                     ParserProxyHandler.FRONTEND_CONN,
-                                    connected.getMaxMessageSize()));
+                                    connected.getMaxMessageSize(), outboundChannel.id()));
                     outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
                             new ParserProxyHandler(service, outboundChannel,
                                     ParserProxyHandler.BACKEND_CONN,
-                                    connected.getMaxMessageSize()));
+                                    connected.getMaxMessageSize(), inboundChannel.id()));
                 } else {
                     inboundChannel.pipeline().addBefore("handler", "inboundParser",
                             new ParserProxyHandler(service, inboundChannel,
                                     ParserProxyHandler.FRONTEND_CONN,
-                                    Commands.DEFAULT_MAX_MESSAGE_SIZE));
+                                    Commands.DEFAULT_MAX_MESSAGE_SIZE, outboundChannel.id()));
                     outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
                             new ParserProxyHandler(service, outboundChannel,
                                     ParserProxyHandler.BACKEND_CONN,
-                                    Commands.DEFAULT_MAX_MESSAGE_SIZE));
+                                    Commands.DEFAULT_MAX_MESSAGE_SIZE, inboundChannel.id()));
                 }
             }
         }
@@ -418,10 +405,6 @@ public class DirectProxyHandler {
             ctx.close();
         }
 
-        public void setRemoteHostName(String remoteHostName) {
-            this.remoteHostName = remoteHostName;
-        }
-
         private boolean verifyTlsHostName(String hostname, ChannelHandlerContext ctx) {
             ChannelHandler sslHandler = ctx.channel().pipeline().get("tls");
 
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
index fea1a40..41a9b59 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
@@ -25,6 +25,7 @@ import io.netty.buffer.CompositeByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelId;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -53,6 +54,7 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
     private final String connType;
 
     private final int maxMessageSize;
+    private final ChannelId peerChannelId;
     private final ProxyService service;
 
 
@@ -66,11 +68,13 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
      */
     private static final Map<String, String> consumerHashMap = new ConcurrentHashMap<>();
 
-    public ParserProxyHandler(ProxyService service, Channel channel, String type, int maxMessageSize) {
+    public ParserProxyHandler(ProxyService service, Channel channel, String type, int maxMessageSize,
+                              ChannelId peerChannelId) {
         this.service = service;
         this.channel = channel;
         this.connType = type;
         this.maxMessageSize = maxMessageSize;
+        this.peerChannelId = peerChannelId;
     }
 
     private void logging(Channel conn, BaseCommand.Type cmdtype, String info, List<RawMessage> messages) {
@@ -154,7 +158,7 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
                         break;
                     }
                     topicName = TopicName.get(ParserProxyHandler.consumerHashMap.get(cmd.getMessage().getConsumerId()
-                            + "," + DirectProxyHandler.inboundOutboundChannelMap.get(ctx.channel().id())));
+                            + "," + peerChannelId));
                     msgBytes = new MutableLong(0);
                     MessageParser.parseMessage(topicName, -1L,
                             -1L, buffer, (message) -> {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 382e475..58203ee 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -20,12 +20,11 @@ package org.apache.pulsar.proxy.server;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.haproxy.HAProxyMessage;
 import io.netty.handler.ssl.SslHandler;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
 import java.net.SocketAddress;
 import java.util.Collections;
 import java.util.List;
@@ -65,7 +64,7 @@ import org.slf4j.LoggerFactory;
  * Handles incoming discovery request from client and sends appropriate response back to client.
  *
  */
-public class ProxyConnection extends PulsarHandler implements FutureListener<Void> {
+public class ProxyConnection extends PulsarHandler {
     private static final Logger LOG = LoggerFactory.getLogger(ProxyConnection.class);
     // ConnectionPool is used by the proxy to issue lookup requests
     private ConnectionPool connectionPool;
@@ -187,6 +186,17 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
     }
 
     @Override
+    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+        if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
+            // handle backpressure
+            // stop/resume reading input from connection between the proxy and the broker
+            // when the writability of the connection between the client and the proxy changes
+            directProxyHandler.outboundChannel.config().setAutoRead(ctx.channel().isWritable());
+        }
+        super.channelWritabilityChanged(ctx);
+    }
+
+    @Override
     public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
         if (msg instanceof HAProxyMessage) {
             haProxyMessage = (HAProxyMessage) msg;
@@ -209,7 +219,8 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
                 directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
                 ProxyService.BYTES_COUNTER.inc(bytes);
             }
-            directProxyHandler.outboundChannel.writeAndFlush(msg).addListener(this);
+            directProxyHandler.outboundChannel.writeAndFlush(msg)
+                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
             break;
 
         default:
@@ -217,18 +228,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         }
     }
 
-    @Override
-    public void operationComplete(Future<Void> future) {
-        // This is invoked when the write operation on the paired connection is
-        // completed
-        if (future.isSuccess()) {
-            ctx.read();
-        } else {
-            LOG.warn("[{}] Error in writing to inbound channel. Closing", remoteAddress, future.cause());
-            directProxyHandler.outboundChannel.close();
-        }
-    }
-
     private synchronized void completeConnect(AuthData clientData) throws PulsarClientException {
         if (service.getConfiguration().isAuthenticationEnabled()) {
             if (service.getConfiguration().isForwardAuthorizationCredentials()) {
@@ -266,18 +265,18 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
                 ctx()
                         .writeAndFlush(
                                 Commands.newError(-1, ServerError.ServiceNotReady, "Target broker isn't available."))
-                        .addListener(future -> ctx().close());
+                        .addListener(ChannelFutureListener.CLOSE);
                 return;
             }
 
             brokerProxyValidator.resolveAndCheckTargetAddress(proxyToBrokerUrl)
-                    .thenAccept(address -> ctx().executor().submit(() -> {
+                    .thenAcceptAsync(address -> {
                         // Client already knows which broker to connect. Let's open a
                         // connection there and just pass bytes in both directions
                         state = State.ProxyConnectionToBroker;
                         directProxyHandler = new DirectProxyHandler(service, this, proxyToBrokerUrl, address,
                                 protocolVersionToAdvertise, sslHandlerSupplier);
-                    }))
+                    }, ctx.executor())
                     .exceptionally(throwable -> {
                         if (throwable instanceof TargetAddressDeniedException
                                 || throwable.getCause() instanceof TargetAddressDeniedException) {
@@ -296,7 +295,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
                                 .writeAndFlush(
                                         Commands.newError(-1, ServerError.ServiceNotReady,
                                                 "Target broker cannot be validated."))
-                                .addListener(future -> ctx().close());
+                                .addListener(ChannelFutureListener.CLOSE);
                         return null;
                     });
         } else {
@@ -305,7 +304,8 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
             // partitions metadata lookups
             state = State.ProxyLookupRequests;
             lookupProxyHandler = new LookupProxyHandler(service, this);
-            ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise));
+            ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise))
+                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
         }
     }
 
@@ -324,7 +324,8 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         }
 
         // auth not complete, continue auth with client side.
-        ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, protocolVersionToAdvertise));
+        ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, protocolVersionToAdvertise))
+                .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
         if (LOG.isDebugEnabled()) {
             LOG.debug("[{}] Authentication in progress client by method {}.",
                 remoteAddress, authMethod);
@@ -402,8 +403,8 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
             doAuthentication(clientData);
         } catch (Exception e) {
             LOG.warn("[{}] Unable to authenticate: ", remoteAddress, e);
-            ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate"));
-            close();
+            ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate"))
+                    .addListener(ChannelFutureListener.CLOSE);
         }
     }
 
@@ -424,8 +425,8 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         } catch (Exception e) {
             String msg = "Unable to handleAuthResponse";
             LOG.warn("[{}] {} ", remoteAddress, msg, e);
-            ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg));
-            close();
+            ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg))
+                    .addListener(ChannelFutureListener.CLOSE);
         }
     }
 
@@ -458,25 +459,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         lookupProxyHandler.handleLookup(lookup);
     }
 
-    private synchronized void close() {
-        if (state != State.Closed) {
-            state = State.Closed;
-            if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
-                directProxyHandler.outboundChannel.close();
-                directProxyHandler = null;
-            }
-            if (connectionPool != null) {
-                try {
-                    connectionPool.close();
-                    connectionPool = null;
-                } catch (Exception e) {
-                    LOG.error("Error closing connection pool", e);
-                }
-            }
-            ctx.close();
-        }
-    }
-
     ClientConfigurationData createClientConfiguration() {
         ClientConfigurationData clientConf = new ClientConfigurationData();
         clientConf.setServiceUrl(service.getServiceUrl());
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java
index 8e45755..fba3c36 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java
@@ -90,6 +90,26 @@ public class BrokerProxyValidatorTest {
         brokerProxyValidator.resolveAndCheckTargetAddress("myhost.mydomain:6650").get();
     }
 
+    @Test
+    public void shouldAllowIPv6Address() throws Exception {
+        BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator(
+                createMockedAddressResolver("fd4d:801b:73fa:abcd:0000:0000:0000:0001"),
+                "*"
+                , "fd4d:801b:73fa:abcd::/64"
+                , "6650");
+        brokerProxyValidator.resolveAndCheckTargetAddress("myhost.mydomain:6650").get();
+    }
+
+    @Test
+    public void shouldAllowIPv6AddressNumeric() throws Exception {
+        BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator(
+                createMockedAddressResolver("fd4d:801b:73fa:abcd:0000:0000:0000:0001"),
+                "*"
+                , "fd4d:801b:73fa:abcd::/64"
+                , "6650");
+        brokerProxyValidator.resolveAndCheckTargetAddress("fd4d:801b:73fa:abcd:0000:0000:0000:0001:6650").get();
+    }
+
     private AddressResolver<InetSocketAddress> createMockedAddressResolver(String ipAddressResult) {
         AddressResolver<InetSocketAddress> inetSocketAddressResolver = mock(AddressResolver.class);
         when(inetSocketAddressResolver.resolve(any())).then(invocationOnMock -> {