You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/04/22 05:55:56 UTC

(pulsar) branch branch-3.2 updated: [improve][broker] Support X-Forwarded-For and HA Proxy Protocol for resolving original client IP of http/https requests (#22524)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 28bce5022a6 [improve][broker] Support X-Forwarded-For and HA Proxy Protocol for resolving original client IP of http/https requests (#22524)
28bce5022a6 is described below

commit 28bce5022a608a6072b9489ee0a2b5d95b36d1ae
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Mon Apr 22 07:49:34 2024 +0300

    [improve][broker] Support X-Forwarded-For and HA Proxy Protocol for resolving original client IP of http/https requests (#22524)
    
    (cherry picked from commit 4a887217d835629cafb393ddf331441b484d4e2c)
---
 conf/broker.conf                                   |  10 ++
 conf/functions_worker.yml                          |  10 ++
 conf/proxy.conf                                    |  10 ++
 conf/standalone.conf                               |  10 ++
 conf/websocket.conf                                |  10 ++
 pom.xml                                            |   1 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  16 ++
 .../pulsar/broker/web/JettyRequestLogFactory.java  | 195 ++++++++++++++++++++-
 pulsar-broker/pom.xml                              |   7 +
 .../org/apache/pulsar/broker/web/WebService.java   |  37 +++-
 .../broker/web/WebServiceOriginalClientIPTest.java | 155 ++++++++++++++++
 pulsar-broker/src/test/resources/log4j2.xml        |  40 +++++
 .../pulsar/functions/worker/WorkerConfig.java      |  16 ++
 .../pulsar/functions/worker/rest/WorkerServer.java |  38 +++-
 pulsar-proxy/pom.xml                               |   6 +
 .../pulsar/proxy/server/ProxyConfiguration.java    |  16 ++
 .../pulsar/proxy/server/ProxyServiceStarter.java   |  31 +++-
 .../org/apache/pulsar/proxy/server/WebServer.java  |  34 +++-
 .../proxy/server/ProxyOriginalClientIPTest.java    | 157 +++++++++++++++++
 .../ProxyServiceStarterDisableZeroCopyTest.java    |   2 +-
 .../proxy/server/ProxyServiceStarterTest.java      |   2 +-
 .../proxy/server/ProxyServiceTlsStarterTest.java   |   2 +-
 pulsar-proxy/src/test/resources/log4j2.xml         |  36 ++++
 .../pulsar/websocket/service/ProxyServer.java      |  39 ++++-
 .../service/WebSocketProxyConfiguration.java       |  14 ++
 25 files changed, 873 insertions(+), 21 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 4f5c4cac565..2d5221c65c4 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -88,6 +88,16 @@ advertisedAddress=
 # If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data.
 haProxyProtocolEnabled=false
 
+# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
+webServiceHaProxyProtocolEnabled=false
+
+# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
+webServiceTrustXForwardedFor=false
+
+# Add detailed client/remote and server/local addresses and ports to http/https request logging.
+# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled.
+webServiceLogDetailedAddresses=
+
 # Number of threads to config Netty Acceptor. Default is 1
 numAcceptorThreads=
 
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 3871c74a887..6f995576ebd 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -27,6 +27,16 @@ workerHostname: localhost
 workerPort: 6750
 workerPortTls: 6751
 
+# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
+webServiceHaProxyProtocolEnabled: false
+
+# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
+webServiceTrustXForwardedFor: false
+
+# Add detailed client/remote and server/local addresses and ports to http/https request logging.
+# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled.
+webServiceLogDetailedAddresses: null
+
 # The Configuration metadata store url
 # Examples:
 # * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 5a9d433f39c..6e6c960e800 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -63,6 +63,16 @@ advertisedAddress=
 # If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data.
 haProxyProtocolEnabled=false
 
+# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
+webServiceHaProxyProtocolEnabled=false
+
+# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
+webServiceTrustXForwardedFor=false
+
+# Add detailed client/remote and server/local addresses and ports to http/https request logging.
+# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled.
+webServiceLogDetailedAddresses=
+
 # Enables zero-copy transport of data across network interfaces using the splice system call.
 # Zero copy mode cannot be used when TLS is enabled or when proxyLogLevel is > 0.
 proxyZeroCopyModeEnabled=true
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 21b57067ad2..316143ab49d 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -51,6 +51,16 @@ advertisedAddress=
 # If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data.
 haProxyProtocolEnabled=false
 
+# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
+webServiceHaProxyProtocolEnabled=false
+
+# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
+webServiceTrustXForwardedFor=false
+
+# Add detailed client/remote and server/local addresses and ports to http/https request logging.
+# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled.
+webServiceLogDetailedAddresses=
+
 # Number of threads to use for Netty IO. Default is set to 2 * Runtime.getRuntime().availableProcessors()
 numIOThreads=
 
diff --git a/conf/websocket.conf b/conf/websocket.conf
index 490cff2722e..9051f3b590c 100644
--- a/conf/websocket.conf
+++ b/conf/websocket.conf
@@ -46,6 +46,16 @@ statusFilePath=
 # Hostname or IP address the service binds on, default is 0.0.0.0.
 bindAddress=0.0.0.0
 
+# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
+webServiceHaProxyProtocolEnabled=false
+
+# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
+webServiceTrustXForwardedFor=false
+
+# Add detailed client/remote and server/local addresses and ports to http/https request logging.
+# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled.
+webServiceLogDetailedAddresses=
+
 # Name of the pulsar cluster to connect to
 clusterName=
 
diff --git a/pom.xml b/pom.xml
index 8556027b3f9..bd41ebbbed9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -272,6 +272,7 @@ flexible messaging model and an intuitive client API.</description>
     <jettison.version>1.5.4</jettison.version>
     <woodstox.version>5.4.0</woodstox.version>
     <wiremock.version>2.33.2</wiremock.version>
+    <consolecaptor.version>1.0.3</consolecaptor.version>
 
     <!-- Plugin dependencies -->
     <protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 0fce252694f..d1f2e9b585f 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -250,6 +250,22 @@ public class ServiceConfiguration implements PulsarConfiguration {
                     + " when getting topic statistics data.")
     private boolean haProxyProtocolEnabled;
 
+    @FieldContext(category = CATEGORY_SERVER,
+            doc = "Enable or disable the use of HA proxy protocol for resolving the client IP for http/https "
+                    + "requests. Default is false.")
+    private boolean webServiceHaProxyProtocolEnabled = false;
+
+    @FieldContext(category = CATEGORY_SERVER, doc =
+            "Trust X-Forwarded-For header for resolving the client IP for http/https requests.\n"
+                    + "Default is false.")
+    private boolean webServiceTrustXForwardedFor = false;
+
+    @FieldContext(category = CATEGORY_SERVER, doc =
+            "Add detailed client/remote and server/local addresses and ports to http/https request logging.\n"
+                    + "Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor "
+                    + "is enabled.")
+    private Boolean webServiceLogDetailedAddresses;
+
     @FieldContext(
             category = CATEGORY_SERVER,
             doc = "Number of threads to use for Netty Acceptor."
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/JettyRequestLogFactory.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/JettyRequestLogFactory.java
index e5daa5852b5..fc88647eb49 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/JettyRequestLogFactory.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/JettyRequestLogFactory.java
@@ -18,9 +18,23 @@
  */
 package org.apache.pulsar.broker.web;
 
+import java.net.InetSocketAddress;
 import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.jetty.io.Connection;
+import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.CustomRequestLog;
+import org.eclipse.jetty.server.ProxyConnectionFactory;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.RequestLog;
+import org.eclipse.jetty.server.Response;
+import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.Slf4jRequestLogWriter;
+import org.eclipse.jetty.util.HostPort;
+import org.eclipse.jetty.util.component.ContainerLifeCycle;
 
 /**
  * Class to standardize initialization of a Jetty request logger for all pulsar components.
@@ -58,7 +72,184 @@ public class JettyRequestLogFactory {
      * Build a new Jetty request logger using the format defined in this class.
      * @return a request logger
      */
-    public static CustomRequestLog createRequestLogger() {
-        return new CustomRequestLog(new Slf4jRequestLogWriter(), LOG_FORMAT);
+    public static RequestLog createRequestLogger() {
+        return createRequestLogger(false, null);
+    }
+
+    /**
+     * Build a new Jetty request logger using the format defined in this class.
+     * @param showDetailedAddresses whether to show detailed addresses and ports in logs
+     * @return a request logger
+     */
+    public static RequestLog createRequestLogger(boolean showDetailedAddresses, Server server) {
+        if (!showDetailedAddresses) {
+            return new CustomRequestLog(new Slf4jRequestLogWriter(), LOG_FORMAT);
+        } else {
+            return new OriginalClientIPRequestLog(server);
+        }
+    }
+
+    /**
+     * Logs the original and real remote (client) and local (server) IP addresses
+     * when detailed addresses are enabled.
+     * Tracks the real addresses of remote and local using a registered Connection.Listener
+     * when detailed addresses are enabled.
+     * This is necessary when Proxy Protocol is used to pass the original client IP.
+     */
+    @Slf4j
+    private static class OriginalClientIPRequestLog extends ContainerLifeCycle implements RequestLog {
+        private final ThreadLocal<StringBuilder> requestLogStringBuilder = ThreadLocal.withInitial(StringBuilder::new);
+        private final CustomRequestLog delegate;
+        private final Slf4jRequestLogWriter delegateLogWriter;
+
+        OriginalClientIPRequestLog(Server server) {
+            delegate = new CustomRequestLog(this::write, LOG_FORMAT);
+            addBean(delegate);
+            delegateLogWriter = new Slf4jRequestLogWriter();
+            addBean(delegateLogWriter);
+            if (server != null) {
+                for (Connector connector : server.getConnectors()) {
+                    // adding the listener is only necessary for connectors that use ProxyConnectionFactory
+                    if (connector.getDefaultConnectionFactory() instanceof ProxyConnectionFactory) {
+                        connector.addBean(proxyProtocolOriginalEndpointListener);
+                    }
+                }
+            }
+        }
+
+        void write(String requestEntry) {
+            StringBuilder sb = requestLogStringBuilder.get();
+            sb.setLength(0);
+            sb.append(requestEntry);
+        }
+
+        @Override
+        public void log(Request request, Response response) {
+            delegate.log(request, response);
+            StringBuilder sb = requestLogStringBuilder.get();
+            sb.append(" [R:");
+            sb.append(request.getRemoteHost());
+            sb.append(':');
+            sb.append(request.getRemotePort());
+            InetSocketAddress realRemoteAddress = lookupRealAddress(request.getHttpChannel().getRemoteAddress());
+            if (realRemoteAddress != null) {
+                String realRemoteHost = HostPort.normalizeHost(realRemoteAddress.getHostString());
+                int realRemotePort = realRemoteAddress.getPort();
+                if (!realRemoteHost.equals(request.getRemoteHost()) || realRemotePort != request.getRemotePort()) {
+                    sb.append(" via ");
+                    sb.append(realRemoteHost);
+                    sb.append(':');
+                    sb.append(realRemotePort);
+                }
+            }
+            sb.append("]->[L:");
+            InetSocketAddress realLocalAddress = lookupRealAddress(request.getHttpChannel().getLocalAddress());
+            if (realLocalAddress != null) {
+                String realLocalHost = HostPort.normalizeHost(realLocalAddress.getHostString());
+                int realLocalPort = realLocalAddress.getPort();
+                sb.append(realLocalHost);
+                sb.append(':');
+                sb.append(realLocalPort);
+                if (!realLocalHost.equals(request.getLocalAddr()) || realLocalPort != request.getLocalPort()) {
+                    sb.append(" dst ");
+                    sb.append(request.getLocalAddr());
+                    sb.append(':');
+                    sb.append(request.getLocalPort());
+                }
+            } else {
+                sb.append(request.getLocalAddr());
+                sb.append(':');
+                sb.append(request.getLocalPort());
+            }
+            sb.append(']');
+            try {
+                delegateLogWriter.write(sb.toString());
+            } catch (Exception e) {
+                log.warn("Failed to write request log", e);
+            }
+        }
+
+        private InetSocketAddress lookupRealAddress(InetSocketAddress socketAddress) {
+            if (socketAddress == null) {
+                return null;
+            }
+            if (proxyProtocolRealAddressMapping.isEmpty()) {
+                return socketAddress;
+            }
+            AddressEntry entry = proxyProtocolRealAddressMapping.get(new AddressKey(socketAddress.getHostString(),
+                    socketAddress.getPort()));
+            if (entry != null) {
+                return entry.realAddress;
+            } else {
+                return socketAddress;
+            }
+        }
+
+        private final Connection.Listener proxyProtocolOriginalEndpointListener =
+                new ProxyProtocolOriginalEndpointListener();
+
+        private final ConcurrentHashMap<AddressKey, AddressEntry> proxyProtocolRealAddressMapping =
+                new ConcurrentHashMap<>();
+
+        // Use a record as key since InetSocketAddress hash code changes if the address gets resolved
+        record AddressKey(String hostString, int port) {
+
+        }
+
+        record AddressEntry(InetSocketAddress realAddress, AtomicInteger referenceCount) {
+
+        }
+
+        // Tracks the real addresses of remote and local when detailed addresses are enabled.
+        // This is necessary when Proxy Protocol is used to pass the original client IP.
+        // The Proxy Protocol implementation in Jetty wraps the original endpoint with a ProxyEndPoint
+        // and the real endpoint information isn't available in the request object.
+        // This listener is added to all connectors to track the real addresses of the client and server.
+        class ProxyProtocolOriginalEndpointListener implements Connection.Listener {
+            @Override
+            public void onOpened(Connection connection) {
+                handleConnection(connection, true);
+            }
+
+            @Override
+            public void onClosed(Connection connection) {
+                handleConnection(connection, false);
+            }
+
+            private void handleConnection(Connection connection, boolean increment) {
+                if (connection.getEndPoint() instanceof ProxyConnectionFactory.ProxyEndPoint) {
+                    ProxyConnectionFactory.ProxyEndPoint proxyEndPoint =
+                            (ProxyConnectionFactory.ProxyEndPoint) connection.getEndPoint();
+                    EndPoint originalEndpoint = proxyEndPoint.unwrap();
+                    mapAddress(proxyEndPoint.getLocalAddress(), originalEndpoint.getLocalAddress(), increment);
+                    mapAddress(proxyEndPoint.getRemoteAddress(), originalEndpoint.getRemoteAddress(), increment);
+                }
+            }
+
+            private void mapAddress(InetSocketAddress current, InetSocketAddress real, boolean increment) {
+                // don't add the mapping if the current address is the same as the real address
+                if (real != null && current != null && current.equals(real)) {
+                    return;
+                }
+                AddressKey key = new AddressKey(current.getHostString(), current.getPort());
+                proxyProtocolRealAddressMapping.compute(key, (__, entry) -> {
+                    if (entry == null) {
+                        if (increment) {
+                            entry = new AddressEntry(real, new AtomicInteger(1));
+                        }
+                    } else {
+                        if (increment) {
+                            entry.referenceCount.incrementAndGet();
+                        } else {
+                            if (entry.referenceCount.decrementAndGet() == 0) {
+                                // remove the entry if the reference count drops to 0
+                                entry = null;
+                            }
+                        }
+                    }
+                    return entry;
+                });
+            }
+        }
     }
 }
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 805ffc4e370..aabc799663c 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -164,6 +164,13 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>io.github.hakky54</groupId>
+      <artifactId>consolecaptor</artifactId>
+      <version>${consolecaptor.version}</version>
+      <scope>test</scope>
+    </dependency>
+
     <!-- zookeeper server -->
     <dependency>
        <groupId>io.dropwizard.metrics</groupId>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 8dc36e2917e..9a439268a8b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -31,12 +31,18 @@ import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.jetty.tls.JettySslContextFactory;
+import org.eclipse.jetty.server.ConnectionFactory;
 import org.eclipse.jetty.server.ConnectionLimit;
+import org.eclipse.jetty.server.ForwardedRequestCustomizer;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.HttpConfiguration;
 import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.ProxyConnectionFactory;
+import org.eclipse.jetty.server.RequestLog;
+import org.eclipse.jetty.server.SecureRequestCustomizer;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.SslConnectionFactory;
 import org.eclipse.jetty.server.handler.ContextHandler;
 import org.eclipse.jetty.server.handler.ContextHandlerCollection;
 import org.eclipse.jetty.server.handler.DefaultHandler;
@@ -103,9 +109,18 @@ public class WebService implements AutoCloseable {
 
         Optional<Integer> port = config.getWebServicePort();
         HttpConfiguration httpConfig = new HttpConfiguration();
+        if (config.isWebServiceTrustXForwardedFor()) {
+            httpConfig.addCustomizer(new ForwardedRequestCustomizer());
+        }
         httpConfig.setRequestHeaderSize(pulsar.getConfig().getHttpMaxRequestHeaderSize());
+        HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory(httpConfig);
         if (port.isPresent()) {
-            httpConnector = new ServerConnector(server, new HttpConnectionFactory(httpConfig));
+            List<ConnectionFactory> connectionFactories = new ArrayList<>();
+            if (config.isWebServiceHaProxyProtocolEnabled()) {
+                connectionFactories.add(new ProxyConnectionFactory());
+            }
+            connectionFactories.add(httpConnectionFactory);
+            httpConnector = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
             httpConnector.setPort(port.get());
             httpConnector.setHost(pulsar.getBindAddress());
             connectors.add(httpConnector);
@@ -144,7 +159,18 @@ public class WebService implements AutoCloseable {
                             config.getWebServiceTlsProtocols(),
                             config.getTlsCertRefreshCheckDurationSec());
                 }
-                httpsConnector = new ServerConnector(server, sslCtxFactory, new HttpConnectionFactory(httpConfig));
+                List<ConnectionFactory> connectionFactories = new ArrayList<>();
+                if (config.isWebServiceHaProxyProtocolEnabled()) {
+                    connectionFactories.add(new ProxyConnectionFactory());
+                }
+                connectionFactories.add(new SslConnectionFactory(sslCtxFactory, httpConnectionFactory.getProtocol()));
+                connectionFactories.add(httpConnectionFactory);
+                // org.eclipse.jetty.server.AbstractConnectionFactory.getFactories contains similar logic
+                // this is needed for TLS authentication
+                if (httpConfig.getCustomizer(SecureRequestCustomizer.class) == null) {
+                    httpConfig.addCustomizer(new SecureRequestCustomizer());
+                }
+                httpsConnector = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
                 httpsConnector.setPort(tlsPort.get());
                 httpsConnector.setHost(pulsar.getBindAddress());
                 connectors.add(httpsConnector);
@@ -284,7 +310,12 @@ public class WebService implements AutoCloseable {
     public void start() throws PulsarServerException {
         try {
             RequestLogHandler requestLogHandler = new RequestLogHandler();
-            requestLogHandler.setRequestLog(JettyRequestLogFactory.createRequestLogger());
+            boolean showDetailedAddresses = pulsar.getConfiguration().getWebServiceLogDetailedAddresses() != null
+                    ? pulsar.getConfiguration().getWebServiceLogDetailedAddresses() :
+                    (pulsar.getConfiguration().isWebServiceHaProxyProtocolEnabled()
+                            || pulsar.getConfiguration().isWebServiceTrustXForwardedFor());
+            RequestLog requestLogger = JettyRequestLogFactory.createRequestLogger(showDetailedAddresses, server);
+            requestLogHandler.setRequestLog(requestLogger);
             handlers.add(0, new ContextHandlerCollection());
             handlers.add(requestLogHandler);
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceOriginalClientIPTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceOriginalClientIPTest.java
new file mode 100644
index 00000000000..7f7fa85bd3b
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceOriginalClientIPTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.web;
+
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import nl.altindag.console.ConsoleCaptor;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.assertj.core.api.ThrowingConsumer;
+import org.awaitility.Awaitility;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.ProxyProtocolClientConnectionFactory.V2;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class WebServiceOriginalClientIPTest extends MockedPulsarServiceBaseTest {
+    HttpClient httpClient;
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        httpClient = new HttpClient(new SslContextFactory(true));
+        httpClient.start();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        if (httpClient != null) {
+            httpClient.stop();
+        }
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        conf.setWebServiceTrustXForwardedFor(true);
+        conf.setWebServiceHaProxyProtocolEnabled(true);
+        conf.setWebServicePortTls(Optional.of(0));
+        conf.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH);
+        conf.setTlsKeyFilePath(BROKER_KEY_FILE_PATH);
+        conf.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
+    }
+
+    @DataProvider(name = "tlsEnabled")
+    public Object[][] tlsEnabled() {
+        return new Object[][] { { true }, { false } };
+    }
+
+    @Test(dataProvider = "tlsEnabled")
+    public void testClientIPIsPickedFromXForwardedForHeaderAndLogged(boolean tlsEnabled) throws Exception {
+        String metricsUrl =
+                (tlsEnabled ? pulsar.getWebServiceAddressTls() : pulsar.getWebServiceAddress()) + "/metrics/";
+        performLoggingTest(consoleCaptor -> {
+            // Send a GET request to the metrics URL
+            ContentResponse response = httpClient.newRequest(metricsUrl)
+                    .header("X-Forwarded-For", "11.22.33.44:12345")
+                    .send();
+
+            // Validate the response
+            assertTrue(response.getContentAsString().contains("process_cpu_seconds_total"));
+
+            // Validate that the client IP passed in X-Forwarded-For is logged
+            assertTrue(consoleCaptor.getStandardOutput().stream()
+                    .anyMatch(line -> line.contains("RequestLog") && line.contains("[R:11.22.33.44:12345 via ")));
+        });
+    }
+
+    @Test(dataProvider = "tlsEnabled")
+    public void testClientIPIsPickedFromForwardedHeaderAndLogged(boolean tlsEnabled) throws Exception {
+        String metricsUrl =
+                (tlsEnabled ? pulsar.getWebServiceAddressTls() : pulsar.getWebServiceAddress()) + "/metrics/";
+        performLoggingTest(consoleCaptor -> {
+            // Send a GET request to the metrics URL
+            ContentResponse response = httpClient.newRequest(metricsUrl)
+                    .header("Forwarded", "for=11.22.33.44:12345")
+                    .send();
+
+            // Validate the response
+            assertTrue(response.getContentAsString().contains("process_cpu_seconds_total"));
+
+            // Validate that the client IP passed in Forwarded is logged
+            assertTrue(consoleCaptor.getStandardOutput().stream()
+                    .anyMatch(line -> line.contains("RequestLog") && line.contains("[R:11.22.33.44:12345 via ")));
+        });
+    }
+
+    @Test(dataProvider = "tlsEnabled")
+    public void testClientIPIsPickedFromHAProxyProtocolAndLogged(boolean tlsEnabled) throws Exception {
+        String metricsUrl = (tlsEnabled ? pulsar.getWebServiceAddressTls() : pulsar.getWebServiceAddress()) + "/metrics/";
+        performLoggingTest(consoleCaptor -> {
+            // Send a GET request to the metrics URL
+            ContentResponse response = httpClient.newRequest(metricsUrl)
+                    // Jetty client will add HA Proxy protocol header with the given IP to the request
+                    .tag(new V2.Tag(V2.Tag.Command.PROXY, null, V2.Tag.Protocol.STREAM,
+                            // source IP and port
+                            "99.22.33.44", 1234,
+                            // destination IP and port
+                            "5.4.3.1", 4321,
+                            null))
+                    .send();
+
+            // Validate the response
+            assertTrue(response.getContentAsString().contains("process_cpu_seconds_total"));
+
+            // Validate that the client IP and destination IP passed in HA Proxy protocol is logged
+            assertTrue(consoleCaptor.getStandardOutput().stream()
+                    .anyMatch(line -> line.contains("RequestLog") && line.contains("[R:99.22.33.44:1234 via ")
+                            && line.contains(" dst 5.4.3.1:4321]")));
+        });
+    }
+
+    void performLoggingTest(ThrowingConsumer<ConsoleCaptor> testFunction) {
+        ConsoleCaptor consoleCaptor = new ConsoleCaptor();
+        try {
+            Awaitility.await().atMost(Duration.of(2, ChronoUnit.SECONDS)).untilAsserted(() -> {
+                consoleCaptor.clearOutput();
+                testFunction.accept(consoleCaptor);
+            });
+        } finally {
+            consoleCaptor.close();
+            System.out.println("--- Captured console output:");
+            consoleCaptor.getStandardOutput().forEach(System.out::println);
+            consoleCaptor.getErrorOutput().forEach(System.err::println);
+            System.out.println("--- End of captured console output");
+        }
+    }
+}
diff --git a/pulsar-broker/src/test/resources/log4j2.xml b/pulsar-broker/src/test/resources/log4j2.xml
new file mode 100644
index 00000000000..09a89702ee2
--- /dev/null
+++ b/pulsar-broker/src/test/resources/log4j2.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<Configuration xmlns="http://logging.apache.org/log4j/2.0/config"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+               xsi:schemaLocation="http://logging.apache.org/log4j/2.0/config https://logging.apache.org/log4j/2.0/log4j-core.xsd">
+  <Appenders>
+    <!-- setting follow="true" is required for using ConsoleCaptor to validate log messages -->
+    <Console name="CONSOLE" target="SYSTEM_OUT" follow="true">
+      <PatternLayout pattern="%d{ISO8601} - %-5p - [%t:%c{1}] - %m%n"/>
+    </Console>
+  </Appenders>
+  <Loggers>
+<!--    <Logger name="org.apache.pulsar.broker.service.persistent.PersistentTopic" level="DEBUG" additivity="false">-->
+<!--       <AppenderRef ref="CONSOLE" />-->
+<!--    </Logger>-->
+
+    <Root level="INFO">
+      <AppenderRef ref="CONSOLE"/>
+    </Root>
+  </Loggers>
+</Configuration>
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index ec0e620d0ae..036311ea132 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -163,6 +163,22 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
             + "(0 to disable limiting)")
     private int maxHttpServerConnections = 2048;
 
+    @FieldContext(category = CATEGORY_WORKER,
+            doc = "Enable or disable the use of HA proxy protocol for resolving the client IP for http/https "
+                    + "requests. Default is false.")
+    private boolean webServiceHaProxyProtocolEnabled = false;
+
+    @FieldContext(category = CATEGORY_WORKER, doc =
+            "Trust X-Forwarded-For header for resolving the client IP for http/https requests.\n"
+                    + "Default is false.")
+    private boolean webServiceTrustXForwardedFor = false;
+
+    @FieldContext(category = CATEGORY_WORKER, doc =
+            "Add detailed client/remote and server/local addresses and ports to http/https request logging.\n"
+                    + "Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor "
+                    + "is enabled.")
+    private Boolean webServiceLogDetailedAddresses;
+
     @FieldContext(
             category = CATEGORY_WORKER,
             required = false,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 2b3ea301210..583d8ce558b 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -35,10 +35,17 @@ import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource;
 import org.apache.pulsar.functions.worker.rest.api.v2.WorkerStatsApiV2Resource;
 import org.apache.pulsar.jetty.tls.JettySslContextFactory;
+import org.eclipse.jetty.server.ConnectionFactory;
 import org.eclipse.jetty.server.ConnectionLimit;
+import org.eclipse.jetty.server.ForwardedRequestCustomizer;
 import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.ProxyConnectionFactory;
+import org.eclipse.jetty.server.SecureRequestCustomizer;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.SslConnectionFactory;
 import org.eclipse.jetty.server.handler.ContextHandlerCollection;
 import org.eclipse.jetty.server.handler.DefaultHandler;
 import org.eclipse.jetty.server.handler.HandlerCollection;
@@ -88,10 +95,21 @@ public class WorkerServer {
             server.addBean(new ConnectionLimit(workerConfig.getMaxHttpServerConnections(), server));
         }
 
+        HttpConfiguration httpConfig = new HttpConfiguration();
+        if (workerConfig.isWebServiceTrustXForwardedFor()) {
+            httpConfig.addCustomizer(new ForwardedRequestCustomizer());
+        }
+        HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory(httpConfig);
+
         List<ServerConnector> connectors = new ArrayList<>();
         if (this.workerConfig.getWorkerPort() != null) {
             log.info("Configuring http server on port={}", this.workerConfig.getWorkerPort());
-            httpConnector = new ServerConnector(server);
+            List<ConnectionFactory> connectionFactories = new ArrayList<>();
+            if (workerConfig.isWebServiceHaProxyProtocolEnabled()) {
+                connectionFactories.add(new ProxyConnectionFactory());
+            }
+            connectionFactories.add(httpConnectionFactory);
+            httpConnector = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
             httpConnector.setPort(this.workerConfig.getWorkerPort());
             connectors.add(httpConnector);
         }
@@ -109,7 +127,10 @@ public class WorkerServer {
             workerConfig.isAuthenticateMetricsEndpoint(), filterInitializer));
 
         RequestLogHandler requestLogHandler = new RequestLogHandler();
-        requestLogHandler.setRequestLog(JettyRequestLogFactory.createRequestLogger());
+        boolean showDetailedAddresses = workerConfig.getWebServiceLogDetailedAddresses() != null
+                ? workerConfig.getWebServiceLogDetailedAddresses() :
+                (workerConfig.isWebServiceHaProxyProtocolEnabled() || workerConfig.isWebServiceTrustXForwardedFor());
+        requestLogHandler.setRequestLog(JettyRequestLogFactory.createRequestLogger(showDetailedAddresses, server));
         handlers.add(0, new ContextHandlerCollection());
         handlers.add(requestLogHandler);
 
@@ -161,7 +182,18 @@ public class WorkerServer {
                             workerConfig.getTlsCertRefreshCheckDurationSec()
                     );
                 }
-                httpsConnector = new ServerConnector(server, sslCtxFactory);
+                List<ConnectionFactory> connectionFactories = new ArrayList<>();
+                if (workerConfig.isWebServiceHaProxyProtocolEnabled()) {
+                    connectionFactories.add(new ProxyConnectionFactory());
+                }
+                connectionFactories.add(new SslConnectionFactory(sslCtxFactory, httpConnectionFactory.getProtocol()));
+                connectionFactories.add(httpConnectionFactory);
+                // org.eclipse.jetty.server.AbstractConnectionFactory.getFactories contains similar logic
+                // this is needed for TLS authentication
+                if (httpConfig.getCustomizer(SecureRequestCustomizer.class) == null) {
+                    httpConfig.addCustomizer(new SecureRequestCustomizer());
+                }
+                httpsConnector = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
                 httpsConnector.setPort(this.workerConfig.getWorkerPortTls());
                 connectors.add(httpsConnector);
             } catch (Exception e) {
diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml
index 85021e4b984..bd9ca6242bf 100644
--- a/pulsar-proxy/pom.xml
+++ b/pulsar-proxy/pom.xml
@@ -203,6 +203,12 @@
       <version>${wiremock.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>io.github.hakky54</groupId>
+      <artifactId>consolecaptor</artifactId>
+      <version>${consolecaptor.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 39c8fb5e086..d65408748f4 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -268,6 +268,22 @@ public class ProxyConfiguration implements PulsarConfiguration {
             doc = "Enable or disable the proxy protocol.")
     private boolean haProxyProtocolEnabled;
 
+    @FieldContext(category = CATEGORY_SERVER,
+            doc = "Enable or disable the use of HA proxy protocol for resolving the client IP for http/https "
+                    + "requests. Default is false.")
+    private boolean webServiceHaProxyProtocolEnabled = false;
+
+    @FieldContext(category = CATEGORY_SERVER, doc =
+            "Trust X-Forwarded-For header for resolving the client IP for http/https requests.\n"
+                    + "Default is false.")
+    private boolean webServiceTrustXForwardedFor = false;
+
+    @FieldContext(category = CATEGORY_SERVER, doc =
+            "Add detailed client/remote and server/local addresses and ports to http/https request logging.\n"
+                    + "Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor "
+                    + "is enabled.")
+    private Boolean webServiceLogDetailedAddresses;
+
     @FieldContext(category = CATEGORY_SERVER,
             doc = "Enables zero-copy transport of data across network interfaces using the spice. "
                     + "Zero copy mode cannot be used when TLS is enabled or when proxyLogLevel is > 0.")
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 8ac60b21a11..ea5b77e902c 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -37,6 +37,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 import lombok.Getter;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.core.util.datetime.FixedDateFormat;
@@ -106,8 +107,15 @@ public class ProxyServiceStarter {
     private WebServer server;
     private WebSocketService webSocketService;
     private static boolean metricsInitialized;
+    private boolean embeddedMode;
 
     public ProxyServiceStarter(String[] args) throws Exception {
+        this(args, null, false);
+    }
+
+    public ProxyServiceStarter(String[] args, Consumer<ProxyConfiguration> proxyConfigurationCustomizer,
+                               boolean embeddedMode) throws Exception {
+        this.embeddedMode = embeddedMode;
         try {
             DateFormat dateFormat = new SimpleDateFormat(
                 FixedDateFormat.FixedFormat.ISO8601_OFFSET_DATE_TIME_HHMM.getPattern());
@@ -130,15 +138,26 @@ public class ProxyServiceStarter {
                     CmdGenerateDocs cmd = new CmdGenerateDocs("pulsar");
                     cmd.addCommand("proxy", this);
                     cmd.run(null);
-                    System.exit(0);
+                    if (embeddedMode) {
+                        return;
+                    } else {
+                        System.exit(0);
+                    }
                 }
             } catch (Exception e) {
                 jcommander.usage();
-                System.exit(1);
+                if (embeddedMode) {
+                    return;
+                } else {
+                    System.exit(1);
+                }
             }
 
             // load config file
             config = PulsarConfigurationLoader.create(configFile, ProxyConfiguration.class);
+            if (proxyConfigurationCustomizer != null) {
+                proxyConfigurationCustomizer.accept(config);
+            }
 
             if (!isBlank(zookeeperServers)) {
                 // Use zookeeperServers from command line
@@ -228,7 +247,9 @@ public class ProxyServiceStarter {
         // create a web-service
         server = new WebServer(config, authenticationService);
 
-        Runtime.getRuntime().addShutdownHook(new Thread(this::close));
+        if (!embeddedMode) {
+            Runtime.getRuntime().addShutdownHook(new Thread(this::close));
+        }
 
         proxyService.start();
 
@@ -291,7 +312,9 @@ public class ProxyServiceStarter {
         } catch (Exception e) {
             log.warn("server couldn't stop gracefully {}", e.getMessage(), e);
         } finally {
-            LogManager.shutdown();
+            if (!embeddedMode) {
+                LogManager.shutdown();
+            }
         }
     }
 
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
index b95bbcab08b..478b911eb23 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
@@ -37,13 +37,18 @@ import org.apache.pulsar.broker.web.JsonMapperProvider;
 import org.apache.pulsar.broker.web.RateLimitingFilter;
 import org.apache.pulsar.broker.web.WebExecutorThreadPool;
 import org.apache.pulsar.jetty.tls.JettySslContextFactory;
+import org.eclipse.jetty.server.ConnectionFactory;
 import org.eclipse.jetty.server.ConnectionLimit;
 import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.ForwardedRequestCustomizer;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.HttpConfiguration;
 import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.ProxyConnectionFactory;
+import org.eclipse.jetty.server.SecureRequestCustomizer;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.SslConnectionFactory;
 import org.eclipse.jetty.server.handler.ContextHandlerCollection;
 import org.eclipse.jetty.server.handler.DefaultHandler;
 import org.eclipse.jetty.server.handler.HandlerCollection;
@@ -93,12 +98,21 @@ public class WebServer {
         List<ServerConnector> connectors = new ArrayList<>();
 
         HttpConfiguration httpConfig = new HttpConfiguration();
+        if (config.isWebServiceTrustXForwardedFor()) {
+            httpConfig.addCustomizer(new ForwardedRequestCustomizer());
+        }
         httpConfig.setOutputBufferSize(config.getHttpOutputBufferSize());
         httpConfig.setRequestHeaderSize(config.getHttpMaxRequestHeaderSize());
 
+        HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory(httpConfig);
         if (config.getWebServicePort().isPresent()) {
             this.externalServicePort = config.getWebServicePort().get();
-            connector = new ServerConnector(server, new HttpConnectionFactory(httpConfig));
+            List<ConnectionFactory> connectionFactories = new ArrayList<>();
+            if (config.isWebServiceHaProxyProtocolEnabled()) {
+                connectionFactories.add(new ProxyConnectionFactory());
+            }
+            connectionFactories.add(httpConnectionFactory);
+            connector = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
             connector.setHost(config.getBindAddress());
             connector.setPort(externalServicePort);
             connectors.add(connector);
@@ -133,7 +147,18 @@ public class WebServer {
                             config.getWebServiceTlsProtocols(),
                             config.getTlsCertRefreshCheckDurationSec());
                 }
-                connectorTls = new ServerConnector(server, sslCtxFactory, new HttpConnectionFactory(httpConfig));
+                List<ConnectionFactory> connectionFactories = new ArrayList<>();
+                if (config.isWebServiceHaProxyProtocolEnabled()) {
+                    connectionFactories.add(new ProxyConnectionFactory());
+                }
+                connectionFactories.add(new SslConnectionFactory(sslCtxFactory, httpConnectionFactory.getProtocol()));
+                connectionFactories.add(httpConnectionFactory);
+                // org.eclipse.jetty.server.AbstractConnectionFactory.getFactories contains similar logic
+                // this is needed for TLS authentication
+                if (httpConfig.getCustomizer(SecureRequestCustomizer.class) == null) {
+                    httpConfig.addCustomizer(new SecureRequestCustomizer());
+                }
+                connectorTls = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
                 connectorTls.setPort(config.getWebServicePortTls().get());
                 connectorTls.setHost(config.getBindAddress());
                 connectors.add(connectorTls);
@@ -281,7 +306,10 @@ public class WebServer {
 
     public void start() throws Exception {
         RequestLogHandler requestLogHandler = new RequestLogHandler();
-        requestLogHandler.setRequestLog(JettyRequestLogFactory.createRequestLogger());
+        boolean showDetailedAddresses = config.getWebServiceLogDetailedAddresses() != null
+                ? config.getWebServiceLogDetailedAddresses() :
+                (config.isWebServiceHaProxyProtocolEnabled() || config.isWebServiceTrustXForwardedFor());
+        requestLogHandler.setRequestLog(JettyRequestLogFactory.createRequestLogger(showDetailedAddresses, server));
         handlers.add(0, new ContextHandlerCollection());
         handlers.add(requestLogHandler);
 
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyOriginalClientIPTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyOriginalClientIPTest.java
new file mode 100644
index 00000000000..b267439d471
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyOriginalClientIPTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import nl.altindag.console.ConsoleCaptor;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.assertj.core.api.ThrowingConsumer;
+import org.awaitility.Awaitility;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.ProxyProtocolClientConnectionFactory.V2;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class ProxyOriginalClientIPTest extends MockedPulsarServiceBaseTest {
+    static final String[] ARGS = new String[]{"-c", "./src/test/resources/proxy.conf"};
+    HttpClient httpClient;
+    ProxyServiceStarter serviceStarter;
+    String webServiceUrl;
+    String webServiceUrlTls;
+
+    @Override
+    @BeforeClass
+    protected void setup() throws Exception {
+        internalSetup();
+        serviceStarter = new ProxyServiceStarter(ARGS, proxyConfig -> {
+            proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+            proxyConfig.setBrokerWebServiceURL(pulsar.getWebServiceAddress());
+            proxyConfig.setWebServicePort(Optional.of(0));
+            proxyConfig.setWebServicePortTls(Optional.of(0));
+            proxyConfig.setTlsEnabledWithBroker(false);
+            proxyConfig.setTlsCertificateFilePath(PROXY_CERT_FILE_PATH);
+            proxyConfig.setTlsKeyFilePath(PROXY_KEY_FILE_PATH);
+            proxyConfig.setServicePort(Optional.of(0));
+            proxyConfig.setWebSocketServiceEnabled(true);
+            proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+            proxyConfig.setClusterName(configClusterName);
+            proxyConfig.setWebServiceTrustXForwardedFor(true);
+            proxyConfig.setWebServiceHaProxyProtocolEnabled(true);
+        }, true);
+        serviceStarter.start();
+        webServiceUrl = "http://localhost:" + serviceStarter.getServer().getListenPortHTTP().get();
+        webServiceUrlTls = "https://localhost:" + serviceStarter.getServer().getListenPortHTTPS().get();
+        httpClient = new HttpClient(new SslContextFactory(true));
+        httpClient.start();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        internalCleanup();
+        if (serviceStarter != null) {
+            serviceStarter.close();
+        }
+        if (httpClient != null) {
+            httpClient.stop();
+        }
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        conf.setWebServiceTrustXForwardedFor(true);
+    }
+
+    @DataProvider(name = "tlsEnabled")
+    public Object[][] tlsEnabled() {
+        return new Object[][] { { true }, { false } };
+    }
+
+    @Test(dataProvider = "tlsEnabled")
+    public void testClientIPIsPickedFromXForwardedForHeaderAndLogged(boolean tlsEnabled) throws Exception {
+        String url = (tlsEnabled ? webServiceUrlTls : webServiceUrl) + "/admin/v2/brokers/leaderBroker";
+        performLoggingTest(consoleCaptor -> {
+            // Send a GET request to the metrics URL
+            ContentResponse response = httpClient.newRequest(url)
+                    .header("X-Forwarded-For", "11.22.33.44")
+                    .send();
+
+            // Validate the response
+            assertTrue(response.getContentAsString().contains("\"brokerId\":\"" + pulsar.getBrokerId() + "\""));
+
+            // Validate that the client IP passed in X-Forwarded-For is logged
+            assertTrue(consoleCaptor.getStandardOutput().stream()
+                    .anyMatch(line -> line.contains("pulsar-external-web-") && line.contains("RequestLog")
+                            && line.contains("R:11.22.33.44")), "Expected to find client IP in proxy logs");
+            assertTrue(consoleCaptor.getStandardOutput().stream()
+                    .anyMatch(line -> line.contains("pulsar-web-") && line.contains("RequestLog")
+                            && line.contains("R:11.22.33.44")), "Expected to find client IP in broker logs");
+        });
+    }
+
+    @Test(dataProvider = "tlsEnabled")
+    public void testClientIPIsPickedFromHAProxyProtocolAndLogged(boolean tlsEnabled) throws Exception {
+        String url = (tlsEnabled ? webServiceUrlTls : webServiceUrl) + "/admin/v2/brokers/leaderBroker";
+        performLoggingTest(consoleCaptor -> {
+            // Send a GET request to the metrics URL
+            ContentResponse response = httpClient.newRequest(url)
+                    // Jetty client will add HA Proxy protocol header with the given IP to the request
+                    .tag(new V2.Tag("99.22.33.44", 1234))
+                    .send();
+
+            // Validate the response
+            assertTrue(response.getContentAsString().contains("\"brokerId\":\"" + pulsar.getBrokerId() + "\""));
+
+            // Validate that the client IP passed in HA proxy protocol is logged
+            assertTrue(consoleCaptor.getStandardOutput().stream()
+                    .anyMatch(line -> line.contains("pulsar-external-web-") && line.contains("RequestLog")
+                            && line.contains("R:99.22.33.44")), "Expected to find client IP in proxy logs");
+            assertTrue(consoleCaptor.getStandardOutput().stream()
+                    .anyMatch(line -> line.contains("pulsar-web-") && line.contains("RequestLog")
+                            && line.contains("R:99.22.33.44")), "Expected to find client IP in broker logs");
+        });
+    }
+
+    void performLoggingTest(ThrowingConsumer<ConsoleCaptor> testFunction) {
+        ConsoleCaptor consoleCaptor = new ConsoleCaptor();
+        try {
+            Awaitility.await().atMost(Duration.of(2, ChronoUnit.SECONDS)).untilAsserted(() -> {
+                consoleCaptor.clearOutput();
+                testFunction.accept(consoleCaptor);
+            });
+        } finally {
+            consoleCaptor.close();
+            System.out.println("--- Captured console output:");
+            consoleCaptor.getStandardOutput().forEach(System.out::println);
+            consoleCaptor.getErrorOutput().forEach(System.err::println);
+            System.out.println("--- End of captured console output");
+        }
+    }
+}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterDisableZeroCopyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterDisableZeroCopyTest.java
index 0c9fa5c7ac3..9eaa9927416 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterDisableZeroCopyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterDisableZeroCopyTest.java
@@ -27,7 +27,7 @@ public class ProxyServiceStarterDisableZeroCopyTest extends ProxyServiceStarterT
     @BeforeClass
     protected void setup() throws Exception {
         internalSetup();
-        serviceStarter = new ProxyServiceStarter(ARGS);
+        serviceStarter = new ProxyServiceStarter(ARGS, null, true);
         serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl());
         serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
         serviceStarter.getConfig().setWebServicePort(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
index 71b1087ee64..21e7b4fd87a 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
@@ -54,7 +54,7 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
     @BeforeClass
     protected void setup() throws Exception {
         internalSetup();
-        serviceStarter = new ProxyServiceStarter(ARGS);
+        serviceStarter = new ProxyServiceStarter(ARGS, null, true);
         serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl());
         serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
         serviceStarter.getConfig().setWebServicePort(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
index b21162577a2..ef295ef3ef3 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
@@ -55,7 +55,7 @@ public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest {
     @BeforeClass
     protected void setup() throws Exception {
         internalSetup();
-        serviceStarter = new ProxyServiceStarter(ARGS);
+        serviceStarter = new ProxyServiceStarter(ARGS, null, true);
         serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl());
         serviceStarter.getConfig().setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
         serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
diff --git a/pulsar-proxy/src/test/resources/log4j2.xml b/pulsar-proxy/src/test/resources/log4j2.xml
new file mode 100644
index 00000000000..261bd2edf69
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/log4j2.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<Configuration xmlns="http://logging.apache.org/log4j/2.0/config"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+               xsi:schemaLocation="http://logging.apache.org/log4j/2.0/config https://logging.apache.org/log4j/2.0/log4j-core.xsd">
+  <Appenders>
+    <!-- setting follow="true" is required for using ConsoleCaptor to validate log messages -->
+    <Console name="CONSOLE" target="SYSTEM_OUT" follow="true">
+      <PatternLayout pattern="%d{ISO8601} - %-5p - [%t:%c{1}] - %m%n"/>
+    </Console>
+  </Appenders>
+  <Loggers>
+    <Root level="INFO">
+      <AppenderRef ref="CONSOLE"/>
+    </Root>
+  </Loggers>
+</Configuration>
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
index 7aed43d056c..bbb34a3e3f7 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
@@ -35,10 +35,17 @@ import org.apache.pulsar.broker.web.JsonMapperProvider;
 import org.apache.pulsar.broker.web.WebExecutorThreadPool;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.jetty.tls.JettySslContextFactory;
+import org.eclipse.jetty.server.ConnectionFactory;
 import org.eclipse.jetty.server.ConnectionLimit;
+import org.eclipse.jetty.server.ForwardedRequestCustomizer;
 import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.ProxyConnectionFactory;
+import org.eclipse.jetty.server.SecureRequestCustomizer;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.SslConnectionFactory;
 import org.eclipse.jetty.server.handler.ContextHandlerCollection;
 import org.eclipse.jetty.server.handler.DefaultHandler;
 import org.eclipse.jetty.server.handler.HandlerCollection;
@@ -73,10 +80,22 @@ public class ProxyServer {
         if (config.getMaxHttpServerConnections() > 0) {
             server.addBean(new ConnectionLimit(config.getMaxHttpServerConnections(), server));
         }
+
+        HttpConfiguration httpConfig = new HttpConfiguration();
+        if (config.isWebServiceTrustXForwardedFor()) {
+            httpConfig.addCustomizer(new ForwardedRequestCustomizer());
+        }
+        HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory(httpConfig);
+
         List<ServerConnector> connectors = new ArrayList<>();
 
         if (config.getWebServicePort().isPresent()) {
-            connector = new ServerConnector(server);
+            List<ConnectionFactory> connectionFactories = new ArrayList<>();
+            if (config.isWebServiceHaProxyProtocolEnabled()) {
+                connectionFactories.add(new ProxyConnectionFactory());
+            }
+            connectionFactories.add(httpConnectionFactory);
+            connector = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
             connector.setPort(config.getWebServicePort().get());
             connectors.add(connector);
         }
@@ -111,7 +130,18 @@ public class ProxyServer {
                             config.getWebServiceTlsProtocols(),
                             config.getTlsCertRefreshCheckDurationSec());
                 }
-                connectorTls = new ServerConnector(server, sslCtxFactory);
+                List<ConnectionFactory> connectionFactories = new ArrayList<>();
+                if (config.isWebServiceHaProxyProtocolEnabled()) {
+                    connectionFactories.add(new ProxyConnectionFactory());
+                }
+                connectionFactories.add(new SslConnectionFactory(sslCtxFactory, httpConnectionFactory.getProtocol()));
+                connectionFactories.add(httpConnectionFactory);
+                // org.eclipse.jetty.server.AbstractConnectionFactory.getFactories contains similar logic
+                // this is needed for TLS authentication
+                if (httpConfig.getCustomizer(SecureRequestCustomizer.class) == null) {
+                    httpConfig.addCustomizer(new SecureRequestCustomizer());
+                }
+                connectorTls = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
                 connectorTls.setPort(config.getWebServicePortTls().get());
                 connectors.add(connectorTls);
             } catch (Exception e) {
@@ -169,7 +199,10 @@ public class ProxyServer {
                 .map(ServerConnector.class::cast).map(ServerConnector::getPort).map(Object::toString)
                 .collect(Collectors.joining(",")));
         RequestLogHandler requestLogHandler = new RequestLogHandler();
-        requestLogHandler.setRequestLog(JettyRequestLogFactory.createRequestLogger());
+        boolean showDetailedAddresses = conf.getWebServiceLogDetailedAddresses() != null
+                ? conf.getWebServiceLogDetailedAddresses() :
+                (conf.isWebServiceHaProxyProtocolEnabled() || conf.isWebServiceTrustXForwardedFor());
+        requestLogHandler.setRequestLog(JettyRequestLogFactory.createRequestLogger(showDetailedAddresses, server));
         handlers.add(0, new ContextHandlerCollection());
         handlers.add(requestLogHandler);
 
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
index 7acfd4a64ad..3fcbcf4b215 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
@@ -96,6 +96,20 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration {
     @FieldContext(doc = "Hostname or IP address the service binds on, default is 0.0.0.0.")
     private String bindAddress = "0.0.0.0";
 
+    @FieldContext(doc = "Enable or disable the use of HA proxy protocol for resolving the client IP for http/https "
+                    + "requests. Default is false.")
+    private boolean webServiceHaProxyProtocolEnabled = false;
+
+    @FieldContext(doc = "Trust X-Forwarded-For header for resolving the client IP for http/https requests.\n"
+                    + "Default is false.")
+    private boolean webServiceTrustXForwardedFor = false;
+
+    @FieldContext(doc =
+            "Add detailed client/remote and server/local addresses and ports to http/https request logging.\n"
+                    + "Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor "
+                    + "is enabled.")
+    private Boolean webServiceLogDetailedAddresses;
+
     @FieldContext(doc = "Maximum size of a text message during parsing in WebSocket proxy")
     private int webSocketMaxTextFrameSize = 1024 * 1024;
     // --- Authentication ---