You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/06/01 14:43:10 UTC

[pulsar] 03/10: [refactor][proxy] Refactor Proxy code and fix connection stalling by switching to auto read mode (#14713)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 58ad46722c25301a995f798eeec92366ed01ffc6
Author: Lari Hotari <lh...@apache.org>
AuthorDate: Tue Mar 22 15:58:43 2022 +0200

    [refactor][proxy] Refactor Proxy code and fix connection stalling by switching to auto read mode (#14713)
    
    Refactor Proxy code to make it easier to understand and maintain. In addition, switch to use auto read mode since the proxies connections seem to stall in some cases since the proxied connection doesn't use Netty's auto read mode and the read handling doesn't seem complete.
    
    Currently, the proxy calls `.read()` when a message is written to the connection. There might be more messages flowing in the other direction and it could result in a blocked connection with the current solution that doesn't use Netty's auto read mode.
    
    Currently auto read is disabled in DirectProxyHandler for the connection between the proxy and the broker:
    https://github.com/apache/pulsar/blob/a26905371749798ec5288fb07a69978a36aacfaa/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java#L112
    
    - replace broker host parsing with a simple solution
    - pass remote host name to ProxyBackendHandler in the constructor
    - rename "targetBrokerUrl" to "brokerHostAndPort" since the "targetBrokerUrl" is really "hostname:port" string
    - move HA proxy message handling to ProxyBackendHandle and extract the logic to a method
    - remove the static "inboundOutboundChannelMap" which was used for log level 2
      - make it obsolete by passing the peer channel id to ParserProxyHandler
     - Enable auto read in proxy and remove `ctx.read()` / `channel.read()` calls
    - prepare for IPv6 support (reported as #14732) by improving the `host:port` parsing (pick last `:` since IPv6 address might contains multiple `:` characters)
    - Handle backpressure properly by switching auto read off when channel writability changes
      - change auto read of the proxy-broker connection based on the writability of the client-proxy connection
      - change auto read of the client-proxy connection based on the writability of the proxy-broker connection
    - Consistently handle write errors by delegating exception handling to exceptionCaught method by using `.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE)`
    
    (cherry picked from commit a1037c75c9c305369799b71d840fa73cb198b293)
    (cherry picked from commit 1e55685329e99f08fa50f4c99cf7f7bed9ffe2e0)
    (cherry picked from commit 9673d6cffa6e0ba92ab538cc19f1f91b6e4bf4ad)
---
 .../pulsar/proxy/server/BrokerProxyValidator.java  |   2 +-
 .../pulsar/proxy/server/DirectProxyHandler.java    | 180 ++++++++++-----------
 .../pulsar/proxy/server/ParserProxyHandler.java    | 114 +++++++------
 .../pulsar/proxy/server/ProxyConnection.java       |  70 ++++----
 .../proxy/server/BrokerProxyValidatorTest.java     |  20 +++
 5 files changed, 194 insertions(+), 192 deletions(-)

diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java
index 3c49653cff9..b255b9c4ebd 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java
@@ -113,7 +113,7 @@ public class BrokerProxyValidator {
     }
 
     public CompletableFuture<InetSocketAddress> resolveAndCheckTargetAddress(String hostAndPort) {
-        int pos = hostAndPort.indexOf(':');
+        int pos = hostAndPort.lastIndexOf(':');
         String host = hostAndPort.substring(0, pos);
         int port = Integer.parseInt(hostAndPort.substring(pos + 1));
         if (!isPortAllowed(port)) {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 1587b72c831..8fa7787215d 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -21,15 +21,14 @@ package org.apache.pulsar.proxy.server;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
-
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelId;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.socket.SocketChannel;
@@ -41,22 +40,12 @@ import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.timeout.ReadTimeoutHandler;
 import io.netty.util.CharsetUtil;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
-
 import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.Arrays;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
-
 import javax.net.ssl.SSLSession;
-
 import lombok.Getter;
-
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
@@ -76,11 +65,11 @@ public class DirectProxyHandler {
 
     @Getter
     private final Channel inboundChannel;
+    private final ProxyConnection proxyConnection;
     @Getter
     Channel outboundChannel;
     @Getter
     private final Rate inboundChannelRequestsRate;
-    protected static Map<ChannelId, ChannelId> inboundOutboundChannelMap = new ConcurrentHashMap<>();
     private final String originalPrincipal;
     private final AuthData clientAuthData;
     private final String clientAuthMethod;
@@ -91,12 +80,13 @@ public class DirectProxyHandler {
     private final ProxyService service;
     private final Runnable onHandshakeCompleteAction;
 
-    public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl,
+    public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String brokerHostAndPort,
                               InetSocketAddress targetBrokerAddress, int protocolVersion,
                               Supplier<SslHandler> sslHandlerSupplier) {
         this.service = service;
         this.authentication = proxyConnection.getClientAuthentication();
         this.inboundChannel = proxyConnection.ctx().channel();
+        this.proxyConnection = proxyConnection;
         this.inboundChannelRequestsRate = new Rate();
         this.originalPrincipal = proxyConnection.clientAuthRole;
         this.clientAuthData = proxyConnection.clientAuthData;
@@ -114,7 +104,18 @@ public class DirectProxyHandler {
         if (brokerProxyConnectTimeoutMs > 0) {
             b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, brokerProxyConnectTimeoutMs);
         }
-        b.group(inboundChannel.eventLoop()).channel(inboundChannel.getClass()).option(ChannelOption.AUTO_READ, false);
+        b.group(inboundChannel.eventLoop())
+                .channel(inboundChannel.getClass());
+
+        String remoteHost;
+        try {
+            remoteHost = parseHost(brokerHostAndPort);
+        } catch (IllegalArgumentException e) {
+            log.warn("[{}] Failed to parse broker host '{}'", inboundChannel, brokerHostAndPort, e);
+            inboundChannel.close();
+            return;
+        }
+
         b.handler(new ChannelInitializer<SocketChannel>() {
             @Override
             protected void initChannel(SocketChannel ch) {
@@ -128,65 +129,58 @@ public class DirectProxyHandler {
                 }
                 ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
                     Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
-                ch.pipeline().addLast("proxyOutboundHandler", new ProxyBackendHandler(config, protocolVersion));
+                ch.pipeline().addLast("proxyOutboundHandler",
+                        new ProxyBackendHandler(config, protocolVersion, remoteHost));
             }
         });
 
-        URI targetBroker;
-        try {
-            // targetBrokerUrl is coming in the "hostname:6650" form, so we need
-            // to extract host and port
-            targetBroker = new URI("pulsar://" + targetBrokerUrl);
-        } catch (URISyntaxException e) {
-            log.warn("[{}] Failed to parse broker url '{}'", inboundChannel, targetBrokerUrl, e);
-            inboundChannel.close();
-            return;
-        }
-
         ChannelFuture f = b.connect(targetBrokerAddress);
         outboundChannel = f.channel();
         f.addListener(future -> {
             if (!future.isSuccess()) {
                 // Close the connection if the connection attempt has failed.
                 log.warn("[{}] Establishing connection to {} ({}) failed. Closing inbound channel.", inboundChannel,
-                        targetBrokerAddress, targetBrokerUrl, future.cause());
+                        targetBrokerAddress, brokerHostAndPort, future.cause());
                 inboundChannel.close();
                 return;
             }
-            final ProxyBackendHandler cnx = (ProxyBackendHandler) outboundChannel.pipeline()
-                    .get("proxyOutboundHandler");
-            cnx.setRemoteHostName(targetBroker.getHost());
-
-            // if enable full parsing feature
-            if (service.getProxyLogLevel() == 2) {
-                //Set a map between inbound and outbound,
-                //so can find inbound by outbound or find outbound by inbound
-                inboundOutboundChannelMap.put(outboundChannel.id() , inboundChannel.id());
-            }
+        });
+    }
 
-            if (config.isHaProxyProtocolEnabled()) {
-                if (proxyConnection.hasHAProxyMessage()) {
-                    outboundChannel.writeAndFlush(encodeProxyProtocolMessage(proxyConnection.getHAProxyMessage()));
-                } else {
-                    if (inboundChannel.remoteAddress() instanceof InetSocketAddress) {
-                        InetSocketAddress clientAddress = (InetSocketAddress) inboundChannel.remoteAddress();
-                        String sourceAddress = clientAddress.getAddress().getHostAddress();
-                        int sourcePort = clientAddress.getPort();
-                        if (outboundChannel.localAddress() instanceof InetSocketAddress) {
-                            InetSocketAddress proxyAddress = (InetSocketAddress) inboundChannel.remoteAddress();
-                            String destinationAddress = proxyAddress.getAddress().getHostAddress();
-                            int destinationPort = proxyAddress.getPort();
-                            HAProxyMessage msg = new HAProxyMessage(HAProxyProtocolVersion.V1, HAProxyCommand.PROXY,
-                                    HAProxyProxiedProtocol.TCP4, sourceAddress, destinationAddress, sourcePort, destinationPort);
-                            outboundChannel.writeAndFlush(encodeProxyProtocolMessage(msg));
-                            msg.release();
-                        }
-                    }
-                }
+    private static String parseHost(String brokerPortAndHost) {
+        int pos = brokerPortAndHost.lastIndexOf(':');
+        if (pos > 0) {
+            return brokerPortAndHost.substring(0, pos);
+        } else {
+            throw new IllegalArgumentException("Illegal broker host:port '" + brokerPortAndHost + "'");
+        }
+    }
+
+    private void writeHAProxyMessage() {
+        if (proxyConnection.hasHAProxyMessage()) {
+            outboundChannel.writeAndFlush(encodeProxyProtocolMessage(proxyConnection.getHAProxyMessage()))
+                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+        } else {
+            if (inboundChannel.remoteAddress() instanceof InetSocketAddress
+                    && outboundChannel.localAddress() instanceof InetSocketAddress) {
+                InetSocketAddress clientAddress = (InetSocketAddress) inboundChannel.remoteAddress();
+                String sourceAddress = clientAddress.getAddress().getHostAddress();
+                int sourcePort = clientAddress.getPort();
+                InetSocketAddress proxyAddress = (InetSocketAddress) inboundChannel.remoteAddress();
+                String destinationAddress = proxyAddress.getAddress().getHostAddress();
+                int destinationPort = proxyAddress.getPort();
+                HAProxyMessage msg = new HAProxyMessage(HAProxyProtocolVersion.V1, HAProxyCommand.PROXY,
+                        HAProxyProxiedProtocol.TCP4, sourceAddress, destinationAddress, sourcePort,
+                        destinationPort);
+                outboundChannel.writeAndFlush(encodeProxyProtocolMessage(msg))
+                        .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+                msg.release();
             }
-        });
+        }
     }
 
+
+
     private ByteBuf encodeProxyProtocolMessage(HAProxyMessage msg) {
         // Max length of v1 version proxy protocol message is 108
         ByteBuf out = Unpooled.buffer(108);
@@ -218,30 +212,45 @@ public class DirectProxyHandler {
         Init, HandshakeCompleted
     }
 
-    public class ProxyBackendHandler extends PulsarDecoder implements FutureListener<Void> {
+    public class ProxyBackendHandler extends PulsarDecoder {
 
         private BackendState state = BackendState.Init;
-        private String remoteHostName;
+        private final String remoteHostName;
         protected ChannelHandlerContext ctx;
         private final ProxyConfiguration config;
         private final int protocolVersion;
 
-        public ProxyBackendHandler(ProxyConfiguration config, int protocolVersion) {
+        public ProxyBackendHandler(ProxyConfiguration config, int protocolVersion, String remoteHostName) {
             this.config = config;
             this.protocolVersion = protocolVersion;
+            this.remoteHostName = remoteHostName;
         }
 
         @Override
         public void channelActive(ChannelHandlerContext ctx) throws Exception {
             this.ctx = ctx;
+
+            if (config.isHaProxyProtocolEnabled()) {
+                writeHAProxyMessage();
+            }
+
             // Send the Connect command to broker
             authenticationDataProvider = authentication.getAuthData(remoteHostName);
             AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
             ByteBuf command;
             command = Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion, "Pulsar proxy",
                     null /* target broker */, originalPrincipal, clientAuthData, clientAuthMethod);
-            outboundChannel.writeAndFlush(command);
-            outboundChannel.read();
+            outboundChannel.writeAndFlush(command)
+                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+        }
+
+        @Override
+        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+            // handle backpressure
+            // stop/resume reading input from connection between the client and the proxy
+            // when the writability of the connection between the proxy and the broker changes
+            inboundChannel.config().setAutoRead(ctx.channel().isWritable());
+            super.channelWritabilityChanged(ctx);
         }
 
         @Override
@@ -262,7 +271,8 @@ public class DirectProxyHandler {
                 if (msg instanceof ByteBuf) {
                     ProxyService.bytesCounter.inc(((ByteBuf) msg).readableBytes());
                 }
-                inboundChannel.writeAndFlush(msg).addListener(this);
+                inboundChannel.writeAndFlush(msg)
+                        .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
                 break;
 
             default:
@@ -301,26 +311,13 @@ public class DirectProxyHandler {
                     log.debug("{} Mutual auth {}", ctx.channel(), authentication.getAuthMethodName());
                 }
 
-                outboundChannel.writeAndFlush(request);
-                outboundChannel.read();
+                outboundChannel.writeAndFlush(request)
+                        .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
             } catch (Exception e) {
                 log.error("Error mutual verify", e);
             }
         }
 
-        @Override
-        public void operationComplete(Future<Void> future) {
-            // This is invoked when the write operation on the paired connection
-            // is completed
-            if (future.isSuccess()) {
-                outboundChannel.read();
-            } else {
-                log.warn("[{}] [{}] Failed to write on proxy connection. Closing both connections.", inboundChannel,
-                        outboundChannel, future.cause());
-                inboundChannel.close();
-            }
-        }
-
         @Override
         protected void messageReceived() {
             // no-op
@@ -350,18 +347,7 @@ public class DirectProxyHandler {
             int maxMessageSize =
                     connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
             inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize))
-                    .addListener(future -> {
-                        if (future.isSuccess()) {
-                            // Start reading from both connections
-                            inboundChannel.read();
-                            outboundChannel.read();
-                        } else {
-                            log.warn("[{}] [{}] Failed to write to inbound connection. Closing both connections.",
-                                    inboundChannel,
-                                    outboundChannel, future.cause());
-                            inboundChannel.close();
-                        }
-                    });
+                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
         }
 
         private void startDirectProxying(CommandConnected connected) {
@@ -390,20 +376,20 @@ public class DirectProxyHandler {
                     inboundChannel.pipeline().addBefore("handler", "inboundParser",
                             new ParserProxyHandler(service, inboundChannel,
                                     ParserProxyHandler.FRONTEND_CONN,
-                                    connected.getMaxMessageSize()));
+                                    connected.getMaxMessageSize(), outboundChannel.id()));
                     outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
                             new ParserProxyHandler(service, outboundChannel,
                                     ParserProxyHandler.BACKEND_CONN,
-                                    connected.getMaxMessageSize()));
+                                    connected.getMaxMessageSize(), inboundChannel.id()));
                 } else {
                     inboundChannel.pipeline().addBefore("handler", "inboundParser",
                             new ParserProxyHandler(service, inboundChannel,
                                     ParserProxyHandler.FRONTEND_CONN,
-                                    Commands.DEFAULT_MAX_MESSAGE_SIZE));
+                                    Commands.DEFAULT_MAX_MESSAGE_SIZE, outboundChannel.id()));
                     outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
                             new ParserProxyHandler(service, outboundChannel,
                                     ParserProxyHandler.BACKEND_CONN,
-                                    Commands.DEFAULT_MAX_MESSAGE_SIZE));
+                                    Commands.DEFAULT_MAX_MESSAGE_SIZE, inboundChannel.id()));
                 }
             }
         }
@@ -419,10 +405,6 @@ public class DirectProxyHandler {
             ctx.close();
         }
 
-        public void setRemoteHostName(String remoteHostName) {
-            this.remoteHostName = remoteHostName;
-        }
-
         private boolean verifyTlsHostName(String hostname, ChannelHandlerContext ctx) {
             ChannelHandler sslHandler = ctx.channel().pipeline().get("tls");
 
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
index 32f4ba73c43..77a6404c7d2 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
@@ -19,10 +19,19 @@
 
 package org.apache.pulsar.proxy.server;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelId;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.raw.MessageParser;
@@ -33,59 +42,62 @@ import org.apache.pulsar.proxy.stats.TopicStats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufUtil;
-import io.netty.buffer.CompositeByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-
 
 public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
 
 
-    private Channel channel;
+    private final Channel channel;
     //inbound
     protected static final String FRONTEND_CONN = "frontendconn";
     //outbound
     protected static final String BACKEND_CONN = "backendconn";
 
-    private String connType;
+    private final String connType;
 
-    private int maxMessageSize;
+    private final int maxMessageSize;
+    private final ChannelId peerChannelId;
     private final ProxyService service;
 
 
-    //producerid+channelid as key
-    //or consumerid+channelid as key
-    private static Map<String, String> producerHashMap = new ConcurrentHashMap<>();
-    private static Map<String, String> consumerHashMap = new ConcurrentHashMap<>();
+    /**
+     * producerid + channelid as key.
+     */
+    private static final Map<String, String> producerHashMap = new ConcurrentHashMap<>();
+
+    /**
+     * consumerid + channelid as key.
+     */
+    private static final Map<String, String> consumerHashMap = new ConcurrentHashMap<>();
 
-    public ParserProxyHandler(ProxyService service, Channel channel, String type, int maxMessageSize) {
+    public ParserProxyHandler(ProxyService service, Channel channel, String type, int maxMessageSize,
+                              ChannelId peerChannelId) {
         this.service = service;
         this.channel = channel;
         this.connType = type;
         this.maxMessageSize = maxMessageSize;
+        this.peerChannelId = peerChannelId;
     }
 
     private void logging(Channel conn, PulsarApi.BaseCommand.Type cmdtype, String info, List<RawMessage> messages) throws Exception{
 
         if (messages != null) {
             // lag
-            for (int i=0; i<messages.size(); i++) {
-                info = info + "["+ (System.currentTimeMillis() - messages.get(i).getPublishTime()) + "] " + new String(ByteBufUtil.getBytes((messages.get(i)).getData()), "UTF8");
+            StringBuilder infoBuilder = new StringBuilder(info);
+            for (RawMessage message : messages) {
+                infoBuilder.append("[").append(System.currentTimeMillis() - message.getPublishTime()).append("] ")
+                        .append(new String(ByteBufUtil.getBytes(message.getData()), StandardCharsets.UTF_8));
             }
+            info = infoBuilder.toString();
         }
         // log conn format is like from source to target
         switch (this.connType) {
             case ParserProxyHandler.FRONTEND_CONN:
-                log.info(ParserProxyHandler.FRONTEND_CONN + ":{} cmd:{} msg:{}", "[" + conn.remoteAddress() + conn.localAddress() + "]", cmdtype, info);
+                log.info(ParserProxyHandler.FRONTEND_CONN + ":{} cmd:{} msg:{}", "[" + conn.remoteAddress()
+                        + conn.localAddress() + "]", cmdtype, info);
                 break;
             case ParserProxyHandler.BACKEND_CONN:
-                log.info(ParserProxyHandler.BACKEND_CONN + ":{} cmd:{} msg:{}", "[" + conn.localAddress() + conn.remoteAddress() + "]", cmdtype, info);
+                log.info(ParserProxyHandler.BACKEND_CONN + ":{} cmd:{} msg:{}", "[" + conn.localAddress()
+                        + conn.remoteAddress() + "]", cmdtype, info);
                 break;
         }
     }
@@ -93,9 +105,9 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
     public void channelRead(ChannelHandlerContext ctx, Object msg) {
         PulsarApi.BaseCommand cmd = null;
         PulsarApi.BaseCommand.Builder cmdBuilder = null;
-        TopicName topicName ;
-        List<RawMessage> messages = Lists.newArrayList();
-        ByteBuf buffer = (ByteBuf)(msg);
+        TopicName topicName;
+        List<RawMessage> messages = new ArrayList<>();
+        ByteBuf buffer = (ByteBuf) (msg);
 
         try {
             buffer.markReaderIndex();
@@ -113,20 +125,23 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
 
             switch (cmd.getType()) {
                 case PRODUCER:
-                    ParserProxyHandler.producerHashMap.put(String.valueOf(cmd.getProducer().getProducerId()) + "," + String.valueOf(ctx.channel().id()), cmd.getProducer().getTopic());
+                    ParserProxyHandler.producerHashMap.put(cmd.getProducer().getProducerId() + "," + ctx.channel().id(),
+                            cmd.getProducer().getTopic());
 
-                    logging(ctx.channel() , cmd.getType() , "{producer:" + cmd.getProducer().getProducerName() + ",topic:" + cmd.getProducer().getTopic() + "}", null);
+                    logging(ctx.channel(), cmd.getType(), "{producer:" + cmd.getProducer().getProducerName()
+                            + ",topic:" + cmd.getProducer().getTopic() + "}", null);
                     break;
 
                 case SEND:
                     if (service.getProxyLogLevel() != 2) {
-                        logging(ctx.channel() , cmd.getType() , "", null);
+                        logging(ctx.channel(), cmd.getType(), "", null);
                         break;
                     }
-                    topicName = TopicName.get(ParserProxyHandler.producerHashMap.get(String.valueOf(cmd.getSend().getProducerId()) + "," + String.valueOf(ctx.channel().id())));
+                    topicName = TopicName.get(ParserProxyHandler.producerHashMap.get(cmd.getSend().getProducerId() + ","
+                            + ctx.channel().id()));
                     MutableLong msgBytes = new MutableLong(0);
-                    MessageParser.parseMessage(topicName,  -1L,
-                            -1L,buffer,(message) -> {
+                    MessageParser.parseMessage(topicName, -1L,
+                            -1L, buffer, (message) -> {
                                 messages.add(message);
                                 msgBytes.add(message.getData().readableBytes());
                             }, maxMessageSize);
@@ -134,42 +149,43 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
                     TopicStats topicStats = this.service.getTopicStats().computeIfAbsent(topicName.toString(),
                         topic -> new TopicStats());
                     topicStats.getMsgInRate().recordMultipleEvents(messages.size(), msgBytes.longValue());
-                    logging(ctx.channel() , cmd.getType() , "" , messages);
+                    logging(ctx.channel(), cmd.getType(), "", messages);
                     break;
 
                 case SUBSCRIBE:
-                    ParserProxyHandler.consumerHashMap.put(String.valueOf(cmd.getSubscribe().getConsumerId()) + "," + String.valueOf(ctx.channel().id()) , cmd.getSubscribe().getTopic());
+                    ParserProxyHandler.consumerHashMap.put(cmd.getSubscribe().getConsumerId() + ","
+                                    + ctx.channel().id(), cmd.getSubscribe().getTopic());
 
-                    logging(ctx.channel() , cmd.getType() , "{consumer:" + cmd.getSubscribe().getConsumerName() + ",topic:" + cmd.getSubscribe().getTopic() + "}" , null);
+                    logging(ctx.channel(), cmd.getType(), "{consumer:" + cmd.getSubscribe().getConsumerName()
+                            + ",topic:" + cmd.getSubscribe().getTopic() + "}", null);
                     break;
 
                 case MESSAGE:
                     if (service.getProxyLogLevel() != 2) {
-                        logging(ctx.channel() , cmd.getType() , "" , null);
+                        logging(ctx.channel(), cmd.getType(), "", null);
                         break;
                     }
-                    topicName = TopicName.get(ParserProxyHandler.consumerHashMap.get(String.valueOf(cmd.getMessage().getConsumerId()) + "," + DirectProxyHandler.inboundOutboundChannelMap.get(ctx.channel().id())));
+                    topicName = TopicName.get(ParserProxyHandler.consumerHashMap.get(cmd.getMessage().getConsumerId()
+                            + "," + peerChannelId));
                     msgBytes = new MutableLong(0);
-                    MessageParser.parseMessage(topicName,  -1L,
-                                -1L,buffer,(message) -> {
-                                    messages.add(message);
-                                    msgBytes.add(message.getData().readableBytes());
-                                }, maxMessageSize);
+                    MessageParser.parseMessage(topicName, -1L,
+                            -1L, buffer, (message) -> {
+                                messages.add(message);
+                                msgBytes.add(message.getData().readableBytes());
+                            }, maxMessageSize);
                     // update topic stats
                     topicStats = this.service.getTopicStats().computeIfAbsent(topicName.toString(),
                             topic -> new TopicStats());
                     topicStats.getMsgOutRate().recordMultipleEvents(messages.size(), msgBytes.longValue());
-                    logging(ctx.channel() , cmd.getType() , "" , messages);
+                    logging(ctx.channel(), cmd.getType(), "", messages);
                     break;
 
                  default:
-                    logging(ctx.channel() , cmd.getType() , "" , null);
+                    logging(ctx.channel(), cmd.getType(), "", null);
                     break;
             }
         } catch (Exception e){
-
-            log.error("{},{},{}" , e.getMessage() , e.getStackTrace() ,  e.getCause());
-
+            log.error("channelRead error ", e);
         } finally {
 
             if (cmdBuilder != null) {
@@ -185,8 +201,8 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
             ByteBuf totalSizeBuf = Unpooled.buffer(4);
             totalSizeBuf.writeInt(buffer.readableBytes());
             CompositeByteBuf compBuf = Unpooled.compositeBuffer();
-            compBuf.addComponents(totalSizeBuf,buffer);
-            compBuf.writerIndex(totalSizeBuf.capacity()+buffer.capacity());
+            compBuf.addComponents(totalSizeBuf, buffer);
+            compBuf.writerIndex(totalSizeBuf.capacity() + buffer.capacity());
 
             // Release mssages
             messages.forEach(RawMessage::release);
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index f1e45f07e81..754e33f96a8 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.proxy.server;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import io.netty.channel.ChannelFutureListener;
 import io.netty.handler.codec.haproxy.HAProxyMessage;
 import java.net.SocketAddress;
 import java.util.Collections;
@@ -69,7 +70,7 @@ import lombok.Getter;
  * Handles incoming discovery request from client and sends appropriate response back to client
  *
  */
-public class ProxyConnection extends PulsarHandler implements FutureListener<Void> {
+public class ProxyConnection extends PulsarHandler {
     private static final Logger LOG = LoggerFactory.getLogger(ProxyConnection.class);
     // ConnectionPool is used by the proxy to issue lookup requests
     private ConnectionPool connectionPool;
@@ -189,6 +190,17 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         ctx.close();
     }
 
+    @Override
+    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+        if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
+            // handle backpressure
+            // stop/resume reading input from connection between the proxy and the broker
+            // when the writability of the connection between the client and the proxy changes
+            directProxyHandler.outboundChannel.config().setAutoRead(ctx.channel().isWritable());
+        }
+        super.channelWritabilityChanged(ctx);
+    }
+
     @Override
     public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
         if (msg instanceof HAProxyMessage) {
@@ -212,7 +224,8 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
                 directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
                 ProxyService.bytesCounter.inc(bytes);
             }
-            directProxyHandler.outboundChannel.writeAndFlush(msg).addListener(this);
+            directProxyHandler.outboundChannel.writeAndFlush(msg)
+                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
             break;
 
         default:
@@ -220,18 +233,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         }
     }
 
-    @Override
-    public void operationComplete(Future<Void> future) {
-        // This is invoked when the write operation on the paired connection is
-        // completed
-        if (future.isSuccess()) {
-            ctx.read();
-        } else {
-            LOG.warn("[{}] Error in writing to inbound channel. Closing", remoteAddress, future.cause());
-            directProxyHandler.outboundChannel.close();
-        }
-    }
-
     private synchronized void completeConnect(AuthData clientData) throws PulsarClientException {
         if (service.getConfiguration().isAuthenticationEnabled()) {
             if (service.getConfiguration().isForwardAuthorizationCredentials()) {
@@ -269,18 +270,18 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
                 ctx()
                         .writeAndFlush(
                                 Commands.newError(-1, ServerError.ServiceNotReady, "Target broker isn't available."))
-                        .addListener(future -> ctx().close());
+                        .addListener(ChannelFutureListener.CLOSE);
                 return;
             }
 
             brokerProxyValidator.resolveAndCheckTargetAddress(proxyToBrokerUrl)
-                    .thenAccept(address -> ctx().executor().submit(() -> {
+                    .thenAcceptAsync(address -> {
                         // Client already knows which broker to connect. Let's open a
                         // connection there and just pass bytes in both directions
                         state = State.ProxyConnectionToBroker;
                         directProxyHandler = new DirectProxyHandler(service, this, proxyToBrokerUrl, address,
                                 protocolVersionToAdvertise, sslHandlerSupplier);
-                    }))
+                    }, ctx.executor())
                     .exceptionally(throwable -> {
                         if (throwable instanceof TargetAddressDeniedException
                                 || throwable.getCause() instanceof TargetAddressDeniedException) {
@@ -299,7 +300,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
                                 .writeAndFlush(
                                         Commands.newError(-1, ServerError.ServiceNotReady,
                                                 "Target broker cannot be validated."))
-                                .addListener(future -> ctx().close());
+                                .addListener(ChannelFutureListener.CLOSE);
                         return null;
                     });
         } else {
@@ -308,7 +309,8 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
             // partitions metadata lookups
             state = State.ProxyLookupRequests;
             lookupProxyHandler = new LookupProxyHandler(service, this);
-            ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise));
+            ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise))
+                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
         }
     }
 
@@ -327,7 +329,8 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         }
 
         // auth not complete, continue auth with client side.
-        ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, protocolVersionToAdvertise));
+        ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, protocolVersionToAdvertise))
+                .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
         if (LOG.isDebugEnabled()) {
             LOG.debug("[{}] Authentication in progress client by method {}.",
                 remoteAddress, authMethod);
@@ -406,8 +409,8 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
             doAuthentication(clientData);
         } catch (Exception e) {
             LOG.warn("[{}] Unable to authenticate: ", remoteAddress, e);
-            ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate"));
-            close();
+            ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate"))
+                    .addListener(ChannelFutureListener.CLOSE);
         }
     }
 
@@ -428,8 +431,8 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         } catch (Exception e) {
             String msg = "Unable to handleAuthResponse";
             LOG.warn("[{}] {} ", remoteAddress, msg, e);
-            ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg));
-            close();
+            ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg))
+                    .addListener(ChannelFutureListener.CLOSE);
         }
     }
 
@@ -462,25 +465,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         lookupProxyHandler.handleLookup(lookup);
     }
 
-    private synchronized void close() {
-        if (state != State.Closed) {
-            state = State.Closed;
-            if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
-                directProxyHandler.outboundChannel.close();
-                directProxyHandler = null;
-            }
-            if (connectionPool != null) {
-                try {
-                    connectionPool.close();
-                    connectionPool = null;
-                } catch (Exception e) {
-                    LOG.error("Error closing connection pool", e);
-                }
-            }
-            ctx.close();
-        }
-    }
-
     ClientConfigurationData createClientConfiguration()
             throws PulsarClientException.UnsupportedAuthenticationException {
         ClientConfigurationData clientConf = new ClientConfigurationData();
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java
index 8e457554cf5..fba3c36e266 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java
@@ -90,6 +90,26 @@ public class BrokerProxyValidatorTest {
         brokerProxyValidator.resolveAndCheckTargetAddress("myhost.mydomain:6650").get();
     }
 
+    @Test
+    public void shouldAllowIPv6Address() throws Exception {
+        BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator(
+                createMockedAddressResolver("fd4d:801b:73fa:abcd:0000:0000:0000:0001"),
+                "*"
+                , "fd4d:801b:73fa:abcd::/64"
+                , "6650");
+        brokerProxyValidator.resolveAndCheckTargetAddress("myhost.mydomain:6650").get();
+    }
+
+    @Test
+    public void shouldAllowIPv6AddressNumeric() throws Exception {
+        BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator(
+                createMockedAddressResolver("fd4d:801b:73fa:abcd:0000:0000:0000:0001"),
+                "*"
+                , "fd4d:801b:73fa:abcd::/64"
+                , "6650");
+        brokerProxyValidator.resolveAndCheckTargetAddress("fd4d:801b:73fa:abcd:0000:0000:0000:0001:6650").get();
+    }
+
     private AddressResolver<InetSocketAddress> createMockedAddressResolver(String ipAddressResult) {
         AddressResolver<InetSocketAddress> inetSocketAddressResolver = mock(AddressResolver.class);
         when(inetSocketAddressResolver.resolve(any())).then(invocationOnMock -> {