You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/11/30 03:26:58 UTC

[pulsar] branch master updated: Support HAProxy proxy protocol for broker and proxy (#8686)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 625627c  Support HAProxy proxy protocol for broker and proxy (#8686)
625627c is described below

commit 625627c8cc2915974a55c5fa7ae02fe33dd2e4c8
Author: lipenghui <pe...@apache.org>
AuthorDate: Mon Nov 30 11:26:37 2020 +0800

    Support HAProxy proxy protocol for broker and proxy (#8686)
    
    ### Motivation
    
    Currently, if enable the proxy in the pulsar cluster and client connect to the cluster through the proxy, when we get topic stats, the consumer address and producer address are not the real client address. This PR fix this problem by leverage HAProxy proxy protocol since this is a more general approach.  more details about the proxy protocol see
    https://www.haproxy.com/blog/haproxy/proxy-protocol/
    https://docs.aws.amazon.com/elasticloadbalancing/latest/classic/enable-proxy-protocol.html
    http://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
    
    ### Modifications
    
    Allow enable proxy protocol on proxy or broker.
    
    ### Verifying this change
    
    Tests added
---
 conf/broker.conf                                   |   3 +
 conf/proxy.conf                                    |   3 +
 conf/standalone.conf                               |   3 +
 distribution/server/src/assemble/LICENSE.bin.txt   |   1 +
 pom.xml                                            |   6 ++
 .../apache/pulsar/broker/ServiceConfiguration.java |   4 +
 .../org/apache/pulsar/broker/service/Consumer.java |  10 +-
 .../org/apache/pulsar/broker/service/Producer.java |   6 +-
 .../broker/service/PulsarChannelInitializer.java   |   4 +
 .../apache/pulsar/broker/service/ServerCnx.java    |  12 +++
 .../apache/pulsar/broker/service/TransportCnx.java |   5 +
 .../broker/service/EnableProxyProtocolTest.java    | 105 ++++++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    |   2 +-
 pulsar-common/pom.xml                              |   5 +
 .../protocol/OptionalProxyProtocolDecoder.java     |  49 +++++++++
 .../pulsar/common/protocol/PulsarDecoder.java      |  11 ++
 .../pulsar/proxy/server/DirectProxyHandler.java    |  55 +++++++++-
 .../pulsar/proxy/server/ProxyConfiguration.java    |   5 +
 .../pulsar/proxy/server/ProxyConnection.java       |  14 +++
 .../proxy/server/ServiceChannelInitializer.java    |   5 +-
 .../server/ProxyEnableHAProxyProtocolTest.java     | 119 +++++++++++++++++++++
 pulsar-sql/presto-distribution/LICENSE             |   1 +
 22 files changed, 423 insertions(+), 5 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index bc09f3a..f22fb3c 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -55,6 +55,9 @@ advertisedAddress=
 # The Default value is absent, the broker uses the first listener as the internal listener.
 # internalListenerName=
 
+# Enable or disable the HAProxy protocol.
+haProxyProtocolEnabled=false
+
 # Number of threads to use for Netty IO. Default is set to 2 * Runtime.getRuntime().availableProcessors()
 numIOThreads=
 
diff --git a/conf/proxy.conf b/conf/proxy.conf
index ae8790e..820eca3 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -50,6 +50,9 @@ zooKeeperCacheExpirySeconds=300
 # If not set, the value of `InetAddress.getLocalHost().getHostname()` is used.
 advertisedAddress=
 
+# Enable or disable the HAProxy protocol.
+haProxyProtocolEnabled=false
+
 # The port to use for server binary Protobuf requests
 servicePort=6650
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 7f8b5fa..2f38f0e 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -36,6 +36,9 @@ bindAddress=0.0.0.0
 # Hostname or IP address the service advertises to the outside world. If not set, the value of InetAddress.getLocalHost().getHostName() is used.
 advertisedAddress=
 
+# Enable or disable the HAProxy protocol.
+haProxyProtocolEnabled=false
+
 # Number of threads to use for Netty IO. Default is set to 2 * Runtime.getRuntime().availableProcessors()
 numIOThreads=
 
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index cc7c915..632e8b5 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -363,6 +363,7 @@ The Apache Software License, Version 2.0
     - io.netty-netty-codec-http-4.1.51.Final.jar
     - io.netty-netty-codec-http2-4.1.51.Final.jar
     - io.netty-netty-codec-socks-4.1.51.Final.jar
+    - io.netty-netty-codec-haproxy-4.1.51.Final.jar
     - io.netty-netty-common-4.1.51.Final.jar
     - io.netty-netty-handler-4.1.51.Final.jar
     - io.netty-netty-handler-proxy-4.1.51.Final.jar
diff --git a/pom.xml b/pom.xml
index d1bd700..c64fb71 100644
--- a/pom.xml
+++ b/pom.xml
@@ -463,6 +463,12 @@ flexible messaging model and an intuitive client API.</description>
       </dependency>
 
       <dependency>
+        <groupId>io.netty</groupId>
+        <artifactId>netty-codec-haproxy</artifactId>
+        <version>${netty.version}</version>
+      </dependency>
+
+      <dependency>
         <groupId>com.beust</groupId>
         <artifactId>jcommander</artifactId>
         <version>${jcommander.version}</version>
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 2fafb55..f6e3d35 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -163,6 +163,10 @@ public class ServiceConfiguration implements PulsarConfiguration {
                     + "The Default value is absent, the broker uses the first listener as the internal listener.")
     private String internalListenerName;
 
+    @FieldContext(category=CATEGORY_SERVER,
+            doc = "Enable or disable the proxy protocol.")
+    private boolean haProxyProtocolEnabled;
+
     @FieldContext(
         category = CATEGORY_SERVER,
         doc = "Number of threads to use for Netty IO."
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index a98f68b..49b0201 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -162,7 +162,11 @@ public class Consumer {
         this.metadata = metadata != null ? metadata : Collections.emptyMap();
 
         stats = new ConsumerStats();
-        stats.setAddress(cnx.clientAddress().toString());
+        if (cnx.hasHAProxyMessage()) {
+            stats.setAddress(cnx.getHAProxyMessage().sourceAddress() + ":" + cnx.getHAProxyMessage().sourcePort());
+        } else {
+            stats.setAddress(cnx.clientAddress().toString());
+        }
         stats.consumerName = consumerName;
         stats.setConnectedSince(DateFormatter.now());
         stats.setClientVersion(cnx.getClientVersion());
@@ -764,5 +768,9 @@ public class Consumer {
         this.readPositionWhenJoining = readPositionWhenJoining;
     }
 
+    public TransportCnx cnx() {
+        return cnx;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(Consumer.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 605905b..8eebaa3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -109,7 +109,11 @@ public class Producer {
         this.metadata = metadata != null ? metadata : Collections.emptyMap();
 
         this.stats = isNonPersistentTopic ? new NonPersistentPublisherStats() : new PublisherStats();
-        stats.setAddress(cnx.clientAddress().toString());
+        if (cnx.hasHAProxyMessage()) {
+            stats.setAddress(cnx.getHAProxyMessage().sourceAddress() + ":" + cnx.getHAProxyMessage().sourcePort());
+        } else {
+            stats.setAddress(cnx.clientAddress().toString());
+        }
         stats.setConnectedSince(DateFormatter.now());
         stats.setClientVersion(cnx.getClientVersion());
         stats.setProducerName(producerName);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index 7425ffb..ae8fe85 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.OptionalProxyProtocolDecoder;
 import org.apache.pulsar.common.util.NettyServerSslContextBuilder;
 import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
 import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
@@ -118,6 +119,9 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
             ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
         }
 
+        if (pulsar.getConfiguration().isHaProxyProtocolEnabled()) {
+            ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new OptionalProxyProtocolDecoder());
+        }
         ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
             brokerConf.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
         // https://stackoverflow.com/questions/37535482/netty-disabling-auto-read-doesnt-work-for-bytetomessagedecoder
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index c5ef5fe..a99ee3e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.broker.service;
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync;
+
+import io.netty.handler.codec.haproxy.HAProxyMessage;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync;
 import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5;
@@ -2105,6 +2107,16 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         return ctx.newPromise();
     }
 
+    @Override
+    public HAProxyMessage getHAProxyMessage() {
+        return proxyMessage;
+    }
+
+    @Override
+    public boolean hasHAProxyMessage() {
+        return proxyMessage != null;
+    }
+
     boolean hasConsumer(long consumerId) {
         return consumers.containsKey(consumerId);
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
index 760441a..f2744ad 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import io.netty.handler.codec.haproxy.HAProxyMessage;
 import io.netty.util.concurrent.Promise;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 
@@ -73,4 +74,8 @@ public interface TransportCnx {
 
     Promise<Void> newPromise();
 
+    boolean hasHAProxyMessage();
+
+    HAProxyMessage getHAProxyMessage();
+
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java
new file mode 100644
index 0000000..2143c07
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import io.netty.buffer.Unpooled;
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public class EnableProxyProtocolTest extends BrokerTestBase  {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        conf.setHaProxyProtocolEnabled(true);
+        super.baseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testSimpleProduceAndConsume() throws PulsarClientException {
+        final String namespace = "prop/ns-abc";
+        final String topicName = "persistent://" + namespace + "/testSimpleProduceAndConsume";
+        final String subName = "my-subscriber-name";
+        final int messages = 100;
+
+        @Cleanup
+        org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+                .subscribe();
+
+        @Cleanup
+        org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        for (int i = 0; i < messages; i++) {
+            producer.send(("Message-" + i).getBytes());
+        }
+
+        int received = 0;
+        for (int i = 0; i < messages; i++) {
+            consumer.acknowledge(consumer.receive());
+            received++;
+        }
+
+        Assert.assertEquals(received, messages);
+    }
+
+    @Test
+    public void testProxyProtocol() throws PulsarClientException, ExecutionException, InterruptedException, PulsarAdminException {
+        final String namespace = "prop/ns-abc";
+        final String topicName = "persistent://" + namespace + "/testProxyProtocol";
+        final String subName = "my-subscriber-name";
+        PulsarClientImpl client = (PulsarClientImpl) pulsarClient;
+        CompletableFuture<ClientCnx> cnx = client.getCnxPool().getConnection(InetSocketAddress.createUnresolved("localhost", pulsar.getBrokerListenPort().get()));
+        // Simulate the proxy protcol message
+        cnx.get().ctx().channel().writeAndFlush(Unpooled.copiedBuffer("PROXY TCP4 198.51.100.22 203.0.113.7 35646 80\r\n".getBytes()));
+        pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+                .subscribe();
+        org.apache.pulsar.broker.service.Consumer c = pulsar.getBrokerService().getTopicReference(topicName).get().getSubscription(subName).getConsumers().get(0);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> Assert.assertTrue(c.cnx().hasHAProxyMessage()));
+        TopicStats topicStats = admin.topics().getStats(topicName);
+        Assert.assertEquals(topicStats.subscriptions.size(), 1);
+        SubscriptionStats subscriptionStats = topicStats.subscriptions.get(subName);
+        Assert.assertEquals(subscriptionStats.consumers.size(), 1);
+        Assert.assertEquals(subscriptionStats.consumers.get(0).getAddress(), "198.51.100.22:35646");
+
+        pulsarClient.newProducer().topic(topicName).create();
+        topicStats = admin.topics().getStats(topicName);
+        Assert.assertEquals(topicStats.publishers.size(), 1);
+        Assert.assertEquals(topicStats.publishers.get(0).getAddress(), "198.51.100.22:35646");
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 8d09606..1efc0c9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1731,7 +1731,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         this.connectionHandler.connectionClosed(cnx);
     }
 
-    ClientCnx getClientCnx() {
+    public ClientCnx getClientCnx() {
         return this.connectionHandler.cnx();
     }
 
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index d08dfb8..449ddb6 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -128,6 +128,11 @@
     </dependency>
 
     <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-codec-haproxy</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.eclipse.jetty</groupId>
       <artifactId>jetty-util</artifactId>
     </dependency>
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/OptionalProxyProtocolDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/OptionalProxyProtocolDecoder.java
new file mode 100644
index 0000000..aa1c55e
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/OptionalProxyProtocolDecoder.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.protocol;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.ProtocolDetectionResult;
+import io.netty.handler.codec.ProtocolDetectionState;
+import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
+import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+
+/**
+ * Decoder that added whether a new connection is prefixed with the ProxyProtocol.
+ * More about the ProxyProtocol see: http://www.haproxy.org/download/1.8/doc/proxy-protocol.txt.
+ */
+public class OptionalProxyProtocolDecoder extends ChannelInboundHandlerAdapter {
+
+    public static final String NAME = "optional-proxy-protocol-decoder";
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        if (msg instanceof ByteBuf) {
+            ProtocolDetectionResult<HAProxyProtocolVersion> result =
+                    HAProxyMessageDecoder.detectProtocol((ByteBuf) msg);
+            if (result.state() == ProtocolDetectionState.DETECTED) {
+                ctx.pipeline().addAfter(NAME, null, new HAProxyMessageDecoder());
+                ctx.pipeline().remove(this);
+            }
+        }
+        super.channelRead(ctx, msg);
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index 123d2dc..ce4b64c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -24,6 +24,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 
+import io.netty.handler.codec.haproxy.HAProxyMessage;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
@@ -85,8 +86,18 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter {
 
+    // From the proxy protocol. If present, it means the client is connected via a reverse proxy.
+    // The broker can get the real client address and proxy address from the proxy message.
+    protected HAProxyMessage proxyMessage;
+
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        if (msg instanceof HAProxyMessage) {
+            HAProxyMessage proxyMessage = (HAProxyMessage) msg;
+            this.proxyMessage = proxyMessage;
+            proxyMessage.release();
+            return;
+        }
         // Get a buffer that contains the full frame
         ByteBuf buffer = (ByteBuf) msg;
         BaseCommand cmd = null;
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 23d3ecb..44d7ad5 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -24,6 +24,7 @@ import static com.google.common.base.Preconditions.checkState;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
@@ -33,10 +34,16 @@ import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.haproxy.HAProxyCommand;
+import io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
 import io.netty.handler.ssl.SslHandler;
+import io.netty.util.CharsetUtil;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
 
+import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Map;
@@ -143,10 +150,56 @@ public class DirectProxyHandler {
                 inboundOutboundChannelMap.put(outboundChannel.id() , inboundChannel.id());
             }
 
-
+            if (config.isHaProxyProtocolEnabled()) {
+                if (proxyConnection.hasHAProxyMessage()) {
+                    outboundChannel.writeAndFlush(encodeProxyProtocolMessage(proxyConnection.getHAProxyMessage()));
+                } else {
+                    if (inboundChannel.remoteAddress() instanceof InetSocketAddress) {
+                        InetSocketAddress clientAddress = (InetSocketAddress) inboundChannel.remoteAddress();
+                        String sourceAddress = clientAddress.getAddress().getHostAddress();
+                        int sourcePort = clientAddress.getPort();
+                        if (outboundChannel.localAddress() instanceof InetSocketAddress) {
+                            InetSocketAddress proxyAddress = (InetSocketAddress) inboundChannel.remoteAddress();
+                            String destinationAddress = proxyAddress.getAddress().getHostAddress();
+                            int destinationPort = proxyAddress.getPort();
+                            HAProxyMessage msg = new HAProxyMessage(HAProxyProtocolVersion.V1, HAProxyCommand.PROXY,
+                                    HAProxyProxiedProtocol.TCP4, sourceAddress, destinationAddress, sourcePort, destinationPort);
+                            outboundChannel.writeAndFlush(encodeProxyProtocolMessage(msg));
+                            msg.release();
+                        }
+                    }
+                }
+            }
         });
     }
 
+    private ByteBuf encodeProxyProtocolMessage(HAProxyMessage msg) {
+        // Max length of v1 version proxy protocol message is 108
+        ByteBuf out = Unpooled.buffer(108);
+        out.writeBytes(TEXT_PREFIX);
+        out.writeByte((byte) ' ');
+        out.writeCharSequence(msg.proxiedProtocol().name(), CharsetUtil.US_ASCII);
+        out.writeByte((byte) ' ');
+        out.writeCharSequence(msg.sourceAddress(), CharsetUtil.US_ASCII);
+        out.writeByte((byte) ' ');
+        out.writeCharSequence(msg.destinationAddress(), CharsetUtil.US_ASCII);
+        out.writeByte((byte) ' ');
+        out.writeCharSequence(String.valueOf(msg.sourcePort()), CharsetUtil.US_ASCII);
+        out.writeByte((byte) ' ');
+        out.writeCharSequence(String.valueOf(msg.destinationPort()), CharsetUtil.US_ASCII);
+        out.writeByte((byte) '\r');
+        out.writeByte((byte) '\n');
+        return out;
+    }
+
+    static final byte[] TEXT_PREFIX = {
+            (byte) 'P',
+            (byte) 'R',
+            (byte) 'O',
+            (byte) 'X',
+            (byte) 'Y',
+    };
+
     enum BackendState {
         Init, HandshakeCompleted
     }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index ace2c4e..52a34a0 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -148,6 +148,11 @@ public class ProxyConfiguration implements PulsarConfiguration {
             + " If not set, the value of `InetAddress.getLocalHost().getCanonicalHostName()` is used."
     )
     private String advertisedAddress;
+
+    @FieldContext(category=CATEGORY_SERVER,
+            doc = "Enable or disable the proxy protocol.")
+    private boolean haProxyProtocolEnabled;
+
     @FieldContext(
         category = CATEGORY_SERVER,
         doc = "The port for serving binary protobuf request"
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index d0bc217..63e652c 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -27,6 +27,7 @@ import java.util.function.Supplier;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
 
+import io.netty.handler.codec.haproxy.HAProxyMessage;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationState;
@@ -88,6 +89,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
     private boolean hasProxyToBrokerUrl;
     private int protocolVersionToAdvertise;
     private String proxyToBrokerUrl;
+    private HAProxyMessage haProxyMessage;
 
     enum State {
         Init,
@@ -169,6 +171,10 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
 
     @Override
     public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
+        if (msg instanceof HAProxyMessage) {
+            haProxyMessage = (HAProxyMessage) msg;
+            return;
+        }
         switch (state) {
         case Init:
         case Connecting:
@@ -458,6 +464,14 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         return ctx;
     }
 
+    public boolean hasHAProxyMessage() {
+        return haProxyMessage != null;
+    }
+
+    public HAProxyMessage getHAProxyMessage() {
+        return haProxyMessage;
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(ProxyConnection.class);
 
 }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index 42a5c07..658dd87 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -25,6 +25,7 @@ import java.util.function.Supplier;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.OptionalProxyProtocolDecoder;
 import org.apache.pulsar.common.util.NettyClientSslContextRefresher;
 import org.apache.pulsar.common.util.NettyServerSslContextBuilder;
 
@@ -126,7 +127,9 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
             ch.pipeline().addLast(TLS_HANDLER,
                     new SslHandler(serverSSLContextAutoRefreshBuilder.get().createSSLEngine()));
         }
-
+        if (proxyService.getConfiguration().isHaProxyProtocolEnabled()) {
+            ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new OptionalProxyProtocolDecoder());
+        }
         ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
                 Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
 
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
new file mode 100644
index 0000000..1f0420c
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import lombok.Cleanup;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.Optional;
+
+import static org.mockito.Mockito.doReturn;
+
+public class ProxyEnableHAProxyProtocolTest extends MockedPulsarServiceBaseTest {
+
+    private static final Logger log = LoggerFactory.getLogger(ProxyEnableHAProxyProtocolTest.class);
+
+    private final String DUMMY_VALUE = "DUMMY_VALUE";
+
+    private ProxyService proxyService;
+    private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @Override
+    @BeforeClass
+    protected void setup() throws Exception {
+        conf.setHaProxyProtocolEnabled(true);
+        internalSetup();
+
+        proxyConfig.setServicePort(Optional.ofNullable(0));
+        proxyConfig.setZookeeperServers(DUMMY_VALUE);
+        proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
+        proxyConfig.setHaProxyProtocolEnabled(true);
+
+        proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
+                PulsarConfigurationLoader.convertFrom(proxyConfig))));
+        doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
+
+        proxyService.start();
+    }
+
+    @Override
+    @AfterClass
+    protected void cleanup() throws Exception {
+        internalCleanup();
+
+        proxyService.close();
+    }
+
+    @Test
+    public void testSimpleProduceAndConsume() throws PulsarClientException, PulsarAdminException {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
+                .build();
+
+        final String topicName = "persistent://sample/test/local/testSimpleProduceAndConsume";
+        final String subName = "my-subscriber-name";
+        final int messages = 100;
+
+        @Cleanup
+        org.apache.pulsar.client.api.Consumer<byte[]> consumer = client.newConsumer().topic(topicName).subscriptionName(subName)
+                .subscribe();
+
+        @Cleanup
+        org.apache.pulsar.client.api.Producer<byte[]> producer = client.newProducer().topic(topicName).create();
+        for (int i = 0; i < messages; i++) {
+            producer.send(("Message-" + i).getBytes());
+        }
+
+        int received = 0;
+        for (int i = 0; i < messages; i++) {
+            consumer.acknowledge(consumer.receive());
+            received++;
+        }
+
+        Assert.assertEquals(received, messages);
+
+        TopicStats topicStats = admin.topics().getStats(topicName);
+        Assert.assertEquals(topicStats.subscriptions.size(), 1);
+        SubscriptionStats subscriptionStats = topicStats.subscriptions.get(subName);
+        Assert.assertEquals(subscriptionStats.consumers.size(), 1);
+        Assert.assertEquals(subscriptionStats.consumers.get(0).getAddress(),
+                ((ConsumerImpl) consumer).getClientCnx().ctx().channel().localAddress().toString().replaceFirst("/", ""));
+
+        topicStats = admin.topics().getStats(topicName);
+        Assert.assertEquals(topicStats.publishers.size(), 1);
+        Assert.assertEquals(topicStats.publishers.get(0).getAddress(),
+                ((ProducerImpl) producer).getClientCnx().ctx().channel().localAddress().toString().replaceFirst("/", ""));
+    }
+}
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index e273452..80e5759 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -237,6 +237,7 @@ The Apache Software License, Version 2.0
     - netty-codec-4.1.51.Final.jar
     - netty-codec-dns-4.1.51.Final.jar
     - netty-codec-http-4.1.51.Final.jar
+    - netty-codec-haproxy-4.1.51.Final.jar
     - netty-common-4.1.51.Final.jar
     - netty-handler-4.1.51.Final.jar
     - netty-reactive-streams-2.0.4.jar