You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/03 05:35:16 UTC

[pulsar] 02/04: Support HAProxy proxy protocol for broker and proxy (#8686)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 38130621f9f8e57419a53dca043d7d362f57f344
Author: lipenghui <pe...@apache.org>
AuthorDate: Mon Nov 30 11:26:37 2020 +0800

    Support HAProxy proxy protocol for broker and proxy (#8686)
    
    Currently, if enable the proxy in the pulsar cluster and client connect to the cluster through the proxy, when we get topic stats, the consumer address and producer address are not the real client address. This PR fix this problem by leverage HAProxy proxy protocol since this is a more general approach.  more details about the proxy protocol see
    https://www.haproxy.com/blog/haproxy/proxy-protocol/
    https://docs.aws.amazon.com/elasticloadbalancing/latest/classic/enable-proxy-protocol.html
    http://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
    
    Allow enable proxy protocol on proxy or broker.
    
    Tests added
    
    (cherry picked from commit 625627c8cc2915974a55c5fa7ae02fe33dd2e4c8)
---
 conf/broker.conf                                   |   3 +
 conf/proxy.conf                                    |   3 +
 conf/standalone.conf                               |   3 +
 distribution/server/src/assemble/LICENSE.bin.txt   |   1 +
 pom.xml                                            |   6 ++
 .../apache/pulsar/broker/ServiceConfiguration.java |   4 +
 .../org/apache/pulsar/broker/service/Consumer.java |  10 +-
 .../org/apache/pulsar/broker/service/Producer.java |   6 +-
 .../broker/service/PulsarChannelInitializer.java   |   4 +
 .../apache/pulsar/broker/service/ServerCnx.java    |  12 +++
 .../apache/pulsar/broker/service/TransportCnx.java |   5 +
 .../broker/service/EnableProxyProtocolTest.java    | 105 ++++++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    |   2 +-
 pulsar-common/pom.xml                              |   5 +
 .../protocol/OptionalProxyProtocolDecoder.java     |  49 +++++++++
 .../pulsar/common/protocol/PulsarDecoder.java      |  11 ++
 .../pulsar/proxy/server/DirectProxyHandler.java    |  55 +++++++++-
 .../pulsar/proxy/server/ProxyConfiguration.java    |   5 +
 .../pulsar/proxy/server/ProxyConnection.java       |  14 +++
 .../proxy/server/ServiceChannelInitializer.java    |   5 +-
 .../server/ProxyEnableHAProxyProtocolTest.java     | 119 +++++++++++++++++++++
 pulsar-sql/presto-distribution/LICENSE             |   1 +
 22 files changed, 423 insertions(+), 5 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 444180e..ca50d27 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -55,6 +55,9 @@ advertisedAddress=
 # The Default value is absent, the broker uses the first listener as the internal listener.
 # internalListenerName=
 
+# Enable or disable the HAProxy protocol.
+haProxyProtocolEnabled=false
+
 # Number of threads to use for Netty IO. Default is set to 2 * Runtime.getRuntime().availableProcessors()
 numIOThreads=
 
diff --git a/conf/proxy.conf b/conf/proxy.conf
index a257184..d899bb4 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -50,6 +50,9 @@ zooKeeperCacheExpirySeconds=300
 # If not set, the value of `InetAddress.getLocalHost().getHostname()` is used.
 advertisedAddress=
 
+# Enable or disable the HAProxy protocol.
+haProxyProtocolEnabled=false
+
 # The port to use for server binary Protobuf requests
 servicePort=6650
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 7ca742a..656cb52 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -36,6 +36,9 @@ bindAddress=0.0.0.0
 # Hostname or IP address the service advertises to the outside world. If not set, the value of InetAddress.getLocalHost().getHostName() is used.
 advertisedAddress=
 
+# Enable or disable the HAProxy protocol.
+haProxyProtocolEnabled=false
+
 # Number of threads to use for Netty IO. Default is set to 2 * Runtime.getRuntime().availableProcessors()
 numIOThreads=
 
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index d5e82ee..8648a19 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -361,6 +361,7 @@ The Apache Software License, Version 2.0
     - io.netty-netty-codec-http-4.1.48.Final.jar
     - io.netty-netty-codec-http2-4.1.48.Final.jar
     - io.netty-netty-codec-socks-4.1.48.Final.jar
+    - io.netty-netty-codec-haproxy-4.1.51.Final.jar
     - io.netty-netty-common-4.1.48.Final.jar
     - io.netty-netty-handler-4.1.48.Final.jar
     - io.netty-netty-handler-proxy-4.1.48.Final.jar
diff --git a/pom.xml b/pom.xml
index f161674..497e32d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -580,6 +580,12 @@ flexible messaging model and an intuitive client API.</description>
       </dependency>
 
       <dependency>
+        <groupId>io.netty</groupId>
+        <artifactId>netty-codec-haproxy</artifactId>
+        <version>4.1.51.Final</version>
+      </dependency>
+
+      <dependency>
         <groupId>com.beust</groupId>
         <artifactId>jcommander</artifactId>
         <version>1.48</version>
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 7d52f78..48b07ff 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -163,6 +163,10 @@ public class ServiceConfiguration implements PulsarConfiguration {
                     + "The Default value is absent, the broker uses the first listener as the internal listener.")
     private String internalListenerName;
 
+    @FieldContext(category=CATEGORY_SERVER,
+            doc = "Enable or disable the proxy protocol.")
+    private boolean haProxyProtocolEnabled;
+
     @FieldContext(
         category = CATEGORY_SERVER,
         doc = "Number of threads to use for Netty IO."
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 968050c..3d397fe 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -158,7 +158,11 @@ public class Consumer {
         this.metadata = metadata != null ? metadata : Collections.emptyMap();
 
         stats = new ConsumerStats();
-        stats.setAddress(cnx.clientAddress().toString());
+        if (cnx.hasHAProxyMessage()) {
+            stats.setAddress(cnx.getHAProxyMessage().sourceAddress() + ":" + cnx.getHAProxyMessage().sourcePort());
+        } else {
+            stats.setAddress(cnx.clientAddress().toString());
+        }
         stats.consumerName = consumerName;
         stats.setConnectedSince(DateFormatter.now());
         stats.setClientVersion(cnx.getClientVersion());
@@ -631,5 +635,9 @@ public class Consumer {
         this.readPositionWhenJoining = readPositionWhenJoining;
     }
 
+    public TransportCnx cnx() {
+        return cnx;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(Consumer.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 3216d43..0566b94 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -108,7 +108,11 @@ public class Producer {
         this.metadata = metadata != null ? metadata : Collections.emptyMap();
 
         this.stats = isNonPersistentTopic ? new NonPersistentPublisherStats() : new PublisherStats();
-        stats.setAddress(cnx.clientAddress().toString());
+        if (cnx.hasHAProxyMessage()) {
+            stats.setAddress(cnx.getHAProxyMessage().sourceAddress() + ":" + cnx.getHAProxyMessage().sourcePort());
+        } else {
+            stats.setAddress(cnx.clientAddress().toString());
+        }
         stats.setConnectedSince(DateFormatter.now());
         stats.setClientVersion(cnx.getClientVersion());
         stats.setProducerName(producerName);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index 7425ffb..ae8fe85 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.OptionalProxyProtocolDecoder;
 import org.apache.pulsar.common.util.NettyServerSslContextBuilder;
 import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
 import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
@@ -118,6 +119,9 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
             ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
         }
 
+        if (pulsar.getConfiguration().isHaProxyProtocolEnabled()) {
+            ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new OptionalProxyProtocolDecoder());
+        }
         ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
             brokerConf.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
         // https://stackoverflow.com/questions/37535482/netty-disabling-auto-read-doesnt-work-for-bytetomessagedecoder
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 6497e12..a9800e9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.broker.service;
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync;
+
+import io.netty.handler.codec.haproxy.HAProxyMessage;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync;
 import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
@@ -1996,6 +1998,16 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         return ctx.newPromise();
     }
 
+    @Override
+    public HAProxyMessage getHAProxyMessage() {
+        return proxyMessage;
+    }
+
+    @Override
+    public boolean hasHAProxyMessage() {
+        return proxyMessage != null;
+    }
+
     boolean hasConsumer(long consumerId) {
         return consumers.containsKey(consumerId);
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
index 2b43eaa..c6a55f1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import io.netty.handler.codec.haproxy.HAProxyMessage;
 import io.netty.util.concurrent.Promise;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -77,4 +78,8 @@ public interface TransportCnx {
 
     Promise<Void> newPromise();
 
+    boolean hasHAProxyMessage();
+
+    HAProxyMessage getHAProxyMessage();
+
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java
new file mode 100644
index 0000000..2143c07
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import io.netty.buffer.Unpooled;
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public class EnableProxyProtocolTest extends BrokerTestBase  {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        conf.setHaProxyProtocolEnabled(true);
+        super.baseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testSimpleProduceAndConsume() throws PulsarClientException {
+        final String namespace = "prop/ns-abc";
+        final String topicName = "persistent://" + namespace + "/testSimpleProduceAndConsume";
+        final String subName = "my-subscriber-name";
+        final int messages = 100;
+
+        @Cleanup
+        org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+                .subscribe();
+
+        @Cleanup
+        org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        for (int i = 0; i < messages; i++) {
+            producer.send(("Message-" + i).getBytes());
+        }
+
+        int received = 0;
+        for (int i = 0; i < messages; i++) {
+            consumer.acknowledge(consumer.receive());
+            received++;
+        }
+
+        Assert.assertEquals(received, messages);
+    }
+
+    @Test
+    public void testProxyProtocol() throws PulsarClientException, ExecutionException, InterruptedException, PulsarAdminException {
+        final String namespace = "prop/ns-abc";
+        final String topicName = "persistent://" + namespace + "/testProxyProtocol";
+        final String subName = "my-subscriber-name";
+        PulsarClientImpl client = (PulsarClientImpl) pulsarClient;
+        CompletableFuture<ClientCnx> cnx = client.getCnxPool().getConnection(InetSocketAddress.createUnresolved("localhost", pulsar.getBrokerListenPort().get()));
+        // Simulate the proxy protcol message
+        cnx.get().ctx().channel().writeAndFlush(Unpooled.copiedBuffer("PROXY TCP4 198.51.100.22 203.0.113.7 35646 80\r\n".getBytes()));
+        pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+                .subscribe();
+        org.apache.pulsar.broker.service.Consumer c = pulsar.getBrokerService().getTopicReference(topicName).get().getSubscription(subName).getConsumers().get(0);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> Assert.assertTrue(c.cnx().hasHAProxyMessage()));
+        TopicStats topicStats = admin.topics().getStats(topicName);
+        Assert.assertEquals(topicStats.subscriptions.size(), 1);
+        SubscriptionStats subscriptionStats = topicStats.subscriptions.get(subName);
+        Assert.assertEquals(subscriptionStats.consumers.size(), 1);
+        Assert.assertEquals(subscriptionStats.consumers.get(0).getAddress(), "198.51.100.22:35646");
+
+        pulsarClient.newProducer().topic(topicName).create();
+        topicStats = admin.topics().getStats(topicName);
+        Assert.assertEquals(topicStats.publishers.size(), 1);
+        Assert.assertEquals(topicStats.publishers.get(0).getAddress(), "198.51.100.22:35646");
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index fd2e182..65e605c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1727,7 +1727,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         this.connectionHandler.connectionClosed(cnx);
     }
 
-    ClientCnx getClientCnx() {
+    public ClientCnx getClientCnx() {
         return this.connectionHandler.cnx();
     }
 
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index c78d642..c08be4c 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -122,6 +122,11 @@
     </dependency>
 
     <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-codec-haproxy</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.eclipse.jetty</groupId>
       <artifactId>jetty-util</artifactId>
     </dependency>
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/OptionalProxyProtocolDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/OptionalProxyProtocolDecoder.java
new file mode 100644
index 0000000..aa1c55e
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/OptionalProxyProtocolDecoder.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.protocol;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.ProtocolDetectionResult;
+import io.netty.handler.codec.ProtocolDetectionState;
+import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
+import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+
+/**
+ * Decoder that added whether a new connection is prefixed with the ProxyProtocol.
+ * More about the ProxyProtocol see: http://www.haproxy.org/download/1.8/doc/proxy-protocol.txt.
+ */
+public class OptionalProxyProtocolDecoder extends ChannelInboundHandlerAdapter {
+
+    public static final String NAME = "optional-proxy-protocol-decoder";
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        if (msg instanceof ByteBuf) {
+            ProtocolDetectionResult<HAProxyProtocolVersion> result =
+                    HAProxyMessageDecoder.detectProtocol((ByteBuf) msg);
+            if (result.state() == ProtocolDetectionState.DETECTED) {
+                ctx.pipeline().addAfter(NAME, null, new HAProxyMessageDecoder());
+                ctx.pipeline().remove(this);
+            }
+        }
+        super.channelRead(ctx, msg);
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index 06462b7..a7e1de2 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -24,6 +24,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 
+import io.netty.handler.codec.haproxy.HAProxyMessage;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
@@ -84,8 +85,18 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter {
 
+    // From the proxy protocol. If present, it means the client is connected via a reverse proxy.
+    // The broker can get the real client address and proxy address from the proxy message.
+    protected HAProxyMessage proxyMessage;
+
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        if (msg instanceof HAProxyMessage) {
+            HAProxyMessage proxyMessage = (HAProxyMessage) msg;
+            this.proxyMessage = proxyMessage;
+            proxyMessage.release();
+            return;
+        }
         // Get a buffer that contains the full frame
         ByteBuf buffer = (ByteBuf) msg;
         BaseCommand cmd = null;
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index e786ba8..6f71fe1 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -24,6 +24,7 @@ import static com.google.common.base.Preconditions.checkState;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
@@ -33,12 +34,18 @@ import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.haproxy.HAProxyCommand;
+import io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
 import io.netty.handler.ssl.SslHandler;
+import io.netty.util.CharsetUtil;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
 import java.util.function.Supplier;
 import lombok.Getter;
 
+import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Map;
@@ -142,10 +149,56 @@ public class DirectProxyHandler {
                 inboundOutboundChannelMap.put(outboundChannel.id() , inboundChannel.id());
             }
 
-
+            if (config.isHaProxyProtocolEnabled()) {
+                if (proxyConnection.hasHAProxyMessage()) {
+                    outboundChannel.writeAndFlush(encodeProxyProtocolMessage(proxyConnection.getHAProxyMessage()));
+                } else {
+                    if (inboundChannel.remoteAddress() instanceof InetSocketAddress) {
+                        InetSocketAddress clientAddress = (InetSocketAddress) inboundChannel.remoteAddress();
+                        String sourceAddress = clientAddress.getAddress().getHostAddress();
+                        int sourcePort = clientAddress.getPort();
+                        if (outboundChannel.localAddress() instanceof InetSocketAddress) {
+                            InetSocketAddress proxyAddress = (InetSocketAddress) inboundChannel.remoteAddress();
+                            String destinationAddress = proxyAddress.getAddress().getHostAddress();
+                            int destinationPort = proxyAddress.getPort();
+                            HAProxyMessage msg = new HAProxyMessage(HAProxyProtocolVersion.V1, HAProxyCommand.PROXY,
+                                    HAProxyProxiedProtocol.TCP4, sourceAddress, destinationAddress, sourcePort, destinationPort);
+                            outboundChannel.writeAndFlush(encodeProxyProtocolMessage(msg));
+                            msg.release();
+                        }
+                    }
+                }
+            }
         });
     }
 
+    private ByteBuf encodeProxyProtocolMessage(HAProxyMessage msg) {
+        // Max length of v1 version proxy protocol message is 108
+        ByteBuf out = Unpooled.buffer(108);
+        out.writeBytes(TEXT_PREFIX);
+        out.writeByte((byte) ' ');
+        out.writeCharSequence(msg.proxiedProtocol().name(), CharsetUtil.US_ASCII);
+        out.writeByte((byte) ' ');
+        out.writeCharSequence(msg.sourceAddress(), CharsetUtil.US_ASCII);
+        out.writeByte((byte) ' ');
+        out.writeCharSequence(msg.destinationAddress(), CharsetUtil.US_ASCII);
+        out.writeByte((byte) ' ');
+        out.writeCharSequence(String.valueOf(msg.sourcePort()), CharsetUtil.US_ASCII);
+        out.writeByte((byte) ' ');
+        out.writeCharSequence(String.valueOf(msg.destinationPort()), CharsetUtil.US_ASCII);
+        out.writeByte((byte) '\r');
+        out.writeByte((byte) '\n');
+        return out;
+    }
+
+    static final byte[] TEXT_PREFIX = {
+            (byte) 'P',
+            (byte) 'R',
+            (byte) 'O',
+            (byte) 'X',
+            (byte) 'Y',
+    };
+
     enum BackendState {
         Init, HandshakeCompleted
     }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 810aa1b..f6734a7 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -142,6 +142,11 @@ public class ProxyConfiguration implements PulsarConfiguration {
             + " If not set, the value of `InetAddress.getLocalHost().getCanonicalHostName()` is used."
     )
     private String advertisedAddress;
+
+    @FieldContext(category=CATEGORY_SERVER,
+            doc = "Enable or disable the proxy protocol.")
+    private boolean haProxyProtocolEnabled;
+
     @FieldContext(
         category = CATEGORY_SERVER,
         doc = "The port for serving binary protobuf request"
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index d0bc217..63e652c 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -27,6 +27,7 @@ import java.util.function.Supplier;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
 
+import io.netty.handler.codec.haproxy.HAProxyMessage;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationState;
@@ -88,6 +89,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
     private boolean hasProxyToBrokerUrl;
     private int protocolVersionToAdvertise;
     private String proxyToBrokerUrl;
+    private HAProxyMessage haProxyMessage;
 
     enum State {
         Init,
@@ -169,6 +171,10 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
 
     @Override
     public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
+        if (msg instanceof HAProxyMessage) {
+            haProxyMessage = (HAProxyMessage) msg;
+            return;
+        }
         switch (state) {
         case Init:
         case Connecting:
@@ -458,6 +464,14 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         return ctx;
     }
 
+    public boolean hasHAProxyMessage() {
+        return haProxyMessage != null;
+    }
+
+    public HAProxyMessage getHAProxyMessage() {
+        return haProxyMessage;
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(ProxyConnection.class);
 
 }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index 42a5c07..658dd87 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -25,6 +25,7 @@ import java.util.function.Supplier;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.OptionalProxyProtocolDecoder;
 import org.apache.pulsar.common.util.NettyClientSslContextRefresher;
 import org.apache.pulsar.common.util.NettyServerSslContextBuilder;
 
@@ -126,7 +127,9 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
             ch.pipeline().addLast(TLS_HANDLER,
                     new SslHandler(serverSSLContextAutoRefreshBuilder.get().createSSLEngine()));
         }
-
+        if (proxyService.getConfiguration().isHaProxyProtocolEnabled()) {
+            ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new OptionalProxyProtocolDecoder());
+        }
         ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
                 Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
 
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
new file mode 100644
index 0000000..1f0420c
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import lombok.Cleanup;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.Optional;
+
+import static org.mockito.Mockito.doReturn;
+
+public class ProxyEnableHAProxyProtocolTest extends MockedPulsarServiceBaseTest {
+
+    private static final Logger log = LoggerFactory.getLogger(ProxyEnableHAProxyProtocolTest.class);
+
+    private final String DUMMY_VALUE = "DUMMY_VALUE";
+
+    private ProxyService proxyService;
+    private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @Override
+    @BeforeClass
+    protected void setup() throws Exception {
+        conf.setHaProxyProtocolEnabled(true);
+        internalSetup();
+
+        proxyConfig.setServicePort(Optional.ofNullable(0));
+        proxyConfig.setZookeeperServers(DUMMY_VALUE);
+        proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
+        proxyConfig.setHaProxyProtocolEnabled(true);
+
+        proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
+                PulsarConfigurationLoader.convertFrom(proxyConfig))));
+        doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
+
+        proxyService.start();
+    }
+
+    @Override
+    @AfterClass
+    protected void cleanup() throws Exception {
+        internalCleanup();
+
+        proxyService.close();
+    }
+
+    @Test
+    public void testSimpleProduceAndConsume() throws PulsarClientException, PulsarAdminException {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
+                .build();
+
+        final String topicName = "persistent://sample/test/local/testSimpleProduceAndConsume";
+        final String subName = "my-subscriber-name";
+        final int messages = 100;
+
+        @Cleanup
+        org.apache.pulsar.client.api.Consumer<byte[]> consumer = client.newConsumer().topic(topicName).subscriptionName(subName)
+                .subscribe();
+
+        @Cleanup
+        org.apache.pulsar.client.api.Producer<byte[]> producer = client.newProducer().topic(topicName).create();
+        for (int i = 0; i < messages; i++) {
+            producer.send(("Message-" + i).getBytes());
+        }
+
+        int received = 0;
+        for (int i = 0; i < messages; i++) {
+            consumer.acknowledge(consumer.receive());
+            received++;
+        }
+
+        Assert.assertEquals(received, messages);
+
+        TopicStats topicStats = admin.topics().getStats(topicName);
+        Assert.assertEquals(topicStats.subscriptions.size(), 1);
+        SubscriptionStats subscriptionStats = topicStats.subscriptions.get(subName);
+        Assert.assertEquals(subscriptionStats.consumers.size(), 1);
+        Assert.assertEquals(subscriptionStats.consumers.get(0).getAddress(),
+                ((ConsumerImpl) consumer).getClientCnx().ctx().channel().localAddress().toString().replaceFirst("/", ""));
+
+        topicStats = admin.topics().getStats(topicName);
+        Assert.assertEquals(topicStats.publishers.size(), 1);
+        Assert.assertEquals(topicStats.publishers.get(0).getAddress(),
+                ((ProducerImpl) producer).getClientCnx().ctx().channel().localAddress().toString().replaceFirst("/", ""));
+    }
+}
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index 23b2f95..99487ea 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -237,6 +237,7 @@ The Apache Software License, Version 2.0
     - netty-codec-4.1.48.Final.jar
     - netty-codec-dns-4.1.48.Final.jar
     - netty-codec-http-4.1.48.Final.jar
+    - netty-codec-haproxy-4.1.51.Final.jar
     - netty-common-4.1.48.Final.jar
     - netty-handler-4.1.48.Final.jar
     - netty-reactive-streams-2.0.4.jar