You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ja...@apache.org on 2018/03/29 17:37:09 UTC

[incubator-pulsar] branch master updated: Added Throttling mechanism to Pulsar Proxy (#1453)

This is an automated email from the ASF dual-hosted git repository.

jai1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 56e0109  Added Throttling mechanism to Pulsar Proxy (#1453)
56e0109 is described below

commit 56e0109657e3b53690d9c91bfe7a231474a36895
Author: Jai Asher <ja...@ccs.neu.edu>
AuthorDate: Thu Mar 29 10:37:05 2018 -0700

    Added Throttling mechanism to Pulsar Proxy (#1453)
---
 conf/proxy.conf                                    |   7 ++
 .../pulsar/proxy/server/DirectProxyHandler.java    |   2 -
 .../pulsar/proxy/server/LookupProxyHandler.java    | 125 ++++++++++++---------
 .../pulsar/proxy/server/ProxyConfiguration.java    |  24 +++-
 .../pulsar/proxy/server/ProxyConnection.java       |  28 ++++-
 .../apache/pulsar/proxy/server/ProxyService.java   |  13 ++-
 .../server/ProxyConnectionThrottlingTest.java      |  87 ++++++++++++++
 .../proxy/server/ProxyLookupThrottlingTest.java    |  91 +++++++++++++++
 8 files changed, 313 insertions(+), 64 deletions(-)

diff --git a/conf/proxy.conf b/conf/proxy.conf
index 5d0647d..f731240 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -69,6 +69,13 @@ superUserRoles=
 # make sure authentication is enabled for this to take effect
 forwardAuthorizationCredentials=false
 
+# --- RateLimiting ----
+# Max concurrent inbound Connections, proxy will reject requests beyond that. Default value is 10,000 
+maxConcurrentInboundConnections=10000
+
+# Max concurrent outbound Connections, proxy will error out requests beyond that. Default value is 10,000 
+maxConcurrentLookupRequests=10000
+
 ##### --- TLS --- #####
 
 # Enable TLS in the proxy
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 92ff107..8b224f6 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -58,7 +58,6 @@ public class DirectProxyHandler {
     private String originalPrincipal;
     private String clientAuthData;
     private String clientAuthMethod;
-    private boolean forwardAuthData;
     public static final String TLS_HANDLER = "tls";
 
     private final Authentication authentication;
@@ -70,7 +69,6 @@ public class DirectProxyHandler {
         this.clientAuthData = proxyConnection.clientAuthData;
         this.clientAuthMethod = proxyConnection.clientAuthMethod;
         ProxyConfiguration config = service.getConfiguration();
-        this.forwardAuthData = service.getConfiguration().forwardAuthorizationCredentials();
 
         // Start the connection attempt.
         Bootstrap b = new Bootstrap();
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index 6da5a89..aad42df 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -65,28 +65,37 @@ public class LookupProxyHandler {
         if (log.isDebugEnabled()) {
             log.debug("Received Lookup from {}", clientAddress);
         }
-
-        lookupRequests.inc();
         long clientRequestId = lookup.getRequestId();
-        String topic = lookup.getTopic();
-        String serviceUrl;
-        if (isBlank(brokerServiceURL)) {
-            ServiceLookupData availableBroker = null;
-            try {
-                availableBroker = service.getDiscoveryProvider().nextBroker();
-            } catch (Exception e) {
-                log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e);
-                proxyConnection.ctx().writeAndFlush(
-                        Commands.newLookupErrorResponse(ServerError.ServiceNotReady, e.getMessage(), clientRequestId));
-                return;
+        if (this.service.getLookupRequestSemaphore().tryAcquire()) {
+            lookupRequests.inc();
+            String topic = lookup.getTopic();
+            String serviceUrl;
+            if (isBlank(brokerServiceURL)) {
+                ServiceLookupData availableBroker = null;
+                try {
+                    availableBroker = service.getDiscoveryProvider().nextBroker();
+                } catch (Exception e) {
+                    log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e);
+                    proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
+                            e.getMessage(), clientRequestId));
+                    return;
+                }
+                serviceUrl = this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls()
+                        : availableBroker.getPulsarServiceUrl();
+            } else {
+                serviceUrl = this.connectWithTLS ? service.getConfiguration().getBrokerServiceURLTLS()
+                        : service.getConfiguration().getBrokerServiceURL();
             }
-            serviceUrl = this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls()
-                    : availableBroker.getPulsarServiceUrl();
+            performLookup(clientRequestId, topic, serviceUrl, false, 10);
+            this.service.getLookupRequestSemaphore().release();
         } else {
-            serviceUrl = this.connectWithTLS ? service.getConfiguration().getBrokerServiceURLTLS()
-                    : service.getConfiguration().getBrokerServiceURL();
+            if (log.isDebugEnabled()) {
+                log.debug("Request ID {} from {} rejected - Too many concurrent lookup requests.", clientRequestId, clientAddress);
+            }
+            proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
+                    "Too many concurrent lookup requests", clientRequestId));
         }
-        performLookup(clientRequestId, topic, serviceUrl, false, 10);
+
     }
 
     private void performLookup(long clientRequestId, String topic, String brokerServiceUrl, boolean authoritative,
@@ -121,27 +130,26 @@ public class LookupProxyHandler {
             } else {
                 command = Commands.newLookup(topic, authoritative, requestId);
             }
-            clientCnx.newLookup(command,
-                    requestId).thenAccept(result -> {
-                        String brokerUrl = connectWithTLS ? result.brokerUrlTls : result.brokerUrl;
-                        if (result.redirect) {
-                            // Need to try the lookup again on a different broker
-                            performLookup(clientRequestId, topic, brokerUrl, result.authoritative, numberOfRetries - 1);
-                        } else {
-                            // Reply the same address for both TLS non-TLS. The reason is that whether we use TLS
-                            // between proxy
-                            // and broker is independent of whether the client itself uses TLS, but we need to force the
-                            // client
-                            // to use the appropriate target broker (and port) when it will connect back.
-                            proxyConnection.ctx().writeAndFlush(Commands.newLookupResponse(brokerUrl, brokerUrl, true,
-                                    LookupType.Connect, clientRequestId, true /* this is coming from proxy */));
-                        }
-                    }).exceptionally(ex -> {
-                        log.warn("[{}] Failed to lookup topic {}: {}", clientAddress, topic, ex.getMessage());
-                        proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
-                                ex.getMessage(), clientRequestId));
-                        return null;
-                    });
+            clientCnx.newLookup(command, requestId).thenAccept(result -> {
+                String brokerUrl = connectWithTLS ? result.brokerUrlTls : result.brokerUrl;
+                if (result.redirect) {
+                    // Need to try the lookup again on a different broker
+                    performLookup(clientRequestId, topic, brokerUrl, result.authoritative, numberOfRetries - 1);
+                } else {
+                    // Reply the same address for both TLS non-TLS. The reason is that whether we use TLS
+                    // between proxy
+                    // and broker is independent of whether the client itself uses TLS, but we need to force the
+                    // client
+                    // to use the appropriate target broker (and port) when it will connect back.
+                    proxyConnection.ctx().writeAndFlush(Commands.newLookupResponse(brokerUrl, brokerUrl, true,
+                            LookupType.Connect, clientRequestId, true /* this is coming from proxy */));
+                }
+            }).exceptionally(ex -> {
+                log.warn("[{}] Failed to lookup topic {}: {}", clientAddress, topic, ex.getMessage());
+                proxyConnection.ctx().writeAndFlush(
+                        Commands.newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), clientRequestId));
+                return null;
+            });
         }).exceptionally(ex -> {
             // Failed to connect to backend broker
             proxyConnection.ctx().writeAndFlush(
@@ -155,13 +163,22 @@ public class LookupProxyHandler {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Received PartitionMetadataLookup", clientAddress);
         }
-
         final long clientRequestId = partitionMetadata.getRequestId();
+        if (this.service.getLookupRequestSemaphore().tryAcquire()) {
+            handlePartitionMetadataResponse(partitionMetadata, clientRequestId);
+            this.service.getLookupRequestSemaphore().release();
+        } else {
+            proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
+                    "Too many concurrent lookup requests", clientRequestId));
+        }
+    }
+
+    private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata,
+            long clientRequestId) {
         TopicName topicName = TopicName.get(partitionMetadata.getTopic());
         if (isBlank(brokerServiceURL)) {
-            service.getDiscoveryProvider().getPartitionedTopicMetadata(service, topicName, proxyConnection.clientAuthRole,
-                    proxyConnection.authenticationData)
-                    .thenAccept(metadata -> {
+            service.getDiscoveryProvider().getPartitionedTopicMetadata(service, topicName,
+                    proxyConnection.clientAuthRole, proxyConnection.authenticationData).thenAccept(metadata -> {
                         if (log.isDebugEnabled()) {
                             log.debug("[{}] Total number of partitions for topic {} is {}",
                                     proxyConnection.clientAuthRole, topicName, metadata.partitions);
@@ -202,18 +219,16 @@ public class LookupProxyHandler {
                 } else {
                     command = Commands.newPartitionMetadataRequest(topicName.toString(), requestId);
                 }
-                clientCnx.newLookup(
-                        command,
-                        requestId).thenAccept(lookupDataResult -> {
-                            proxyConnection.ctx().writeAndFlush(Commands
-                                    .newPartitionMetadataResponse(lookupDataResult.partitions, clientRequestId));
-                        }).exceptionally((ex) -> {
-                            log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(),
-                                    ex.getCause().getMessage(), ex);
-                            proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(
-                                    ServerError.ServiceNotReady, ex.getMessage(), clientRequestId));
-                            return null;
-                        });
+                clientCnx.newLookup(command, requestId).thenAccept(lookupDataResult -> {
+                    proxyConnection.ctx().writeAndFlush(
+                            Commands.newPartitionMetadataResponse(lookupDataResult.partitions, clientRequestId));
+                }).exceptionally((ex) -> {
+                    log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(),
+                            ex.getCause().getMessage(), ex);
+                    proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
+                            ex.getMessage(), clientRequestId));
+                    return null;
+                });
             }).exceptionally(ex -> {
                 // Failed to connect to backend broker
                 proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index a8d3855..43b8d56 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -70,6 +70,12 @@ public class ProxyConfiguration implements PulsarConfiguration {
     // make sure authentication is enabled for this to take effect
     private boolean forwardAuthorizationCredentials = false;
 
+    // Max concurrent inbound Connections
+    private int maxConcurrentInboundConnections = 10000;
+
+    // Max concurrent outbound Connections
+    private int maxConcurrentLookupRequests = 10000;
+
     // Authentication settings of the proxy itself. Used to connect to brokers
     private String brokerClientAuthenticationPlugin;
     private String brokerClientAuthenticationParameters;
@@ -335,7 +341,23 @@ public class ProxyConfiguration implements PulsarConfiguration {
     public void setTlsCiphers(Set<String> tlsCiphers) {
         this.tlsCiphers = tlsCiphers;
     }
-    
+
+    public int getMaxConcurrentInboundConnections() {
+        return maxConcurrentInboundConnections;
+    }
+
+    public void setMaxConcurrentInboundConnections(int maxConcurrentInboundConnections) {
+        this.maxConcurrentInboundConnections = maxConcurrentInboundConnections;
+    }
+
+    public int getMaxConcurrentLookupRequests() {
+        return maxConcurrentLookupRequests;
+    }
+
+    public void setMaxConcurrentLookupRequests(int maxConcurrentLookupRequests) {
+        this.maxConcurrentLookupRequests = maxConcurrentLookupRequests;
+    }
+
     public boolean getTlsRequireTrustedClientCertOnConnect() {
         return tlsRequireTrustedClientCertOnConnect;
     }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 89d737d..e65461a 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -40,6 +40,8 @@ import org.slf4j.LoggerFactory;
 
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandler;
+import io.netty.channel.ChannelPipeline;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
@@ -92,9 +94,27 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
     }
 
     @Override
+    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+        super.channelRegistered(ctx);
+        activeConnections.inc();
+        if (activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("[{}] Too many connection opened {}", remoteAddress, activeConnections.get());
+            }
+            ctx.close();
+            return;
+        }
+    }
+
+    @Override
+    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+        super.channelUnregistered(ctx);
+        activeConnections.dec();
+    }
+    
+    @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         super.channelActive(ctx);
-        activeConnections.inc();
         newConnections.inc();
         LOG.info("[{}] New connection opened", remoteAddress);
     }
@@ -102,7 +122,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
     @Override
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
         super.channelInactive(ctx);
-        activeConnections.dec();
 
         if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
             directProxyHandler.outboundChannel.close();
@@ -164,7 +183,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
             close();
             return;
         }
-        
+
         if (connect.hasProxyToBrokerUrl()) {
             // Client already knows which broker to connect. Let's open a connection
             // there and just pass bytes in both directions
@@ -226,8 +245,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
                 sslSession = ((SslHandler) sslHandler).engine().getSession();
             }
             authenticationData = new AuthenticationDataCommand(authData, remoteAddress, sslSession);
-            clientAuthRole = service.getAuthenticationService()
-                    .authenticate(authenticationData, authMethod);
+            clientAuthRole = service.getAuthenticationService().authenticate(authenticationData, authMethod);
             LOG.info("[{}] Client successfully authenticated with {} role {}", remoteAddress, authMethod,
                     clientAuthRole);
             return true;
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 4337442..c77ace2 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -25,6 +25,8 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
@@ -78,12 +80,17 @@ public class ProxyService implements Closeable {
 
     private LocalZooKeeperConnectionService localZooKeeperConnectionService;
 
+    protected final AtomicReference<Semaphore> lookupRequestSemaphore;
+
     private static final int numThreads = Runtime.getRuntime().availableProcessors();
 
     public ProxyService(ProxyConfiguration proxyConfig) throws IOException {
         checkNotNull(proxyConfig);
         this.proxyConfig = proxyConfig;
 
+        this.lookupRequestSemaphore = new AtomicReference<Semaphore>(
+                new Semaphore(proxyConfig.getMaxConcurrentLookupRequests(), false));
+
         String hostname;
         try {
             hostname = InetAddress.getLocalHost().getHostName();
@@ -93,7 +100,7 @@ public class ProxyService implements Closeable {
         this.serviceUrl = String.format("pulsar://%s:%d/", hostname, proxyConfig.getServicePort());
         this.serviceUrlTls = String.format("pulsar://%s:%d/", hostname, proxyConfig.getServicePortTls());
 
-        this.acceptorGroup  = EventLoopUtil.newEventLoopGroup(1, acceptorThreadFactory);
+        this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1, acceptorThreadFactory);
         this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, workersThreadFactory);
 
         ClientConfigurationData clientConf = new ClientConfigurationData();
@@ -218,5 +225,9 @@ public class ProxyService implements Closeable {
         this.configurationCacheService = configurationCacheService;
     }
 
+    public Semaphore getLookupRequestSemaphore() {
+        return lookupRequestSemaphore.get();
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(ProxyService.class);
 }
\ No newline at end of file
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
new file mode 100644
index 0000000..008e751
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.doReturn;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest {
+
+    private final String DUMMY_VALUE = "DUMMY_VALUE";
+    private final int NUM_CONCURRENT_LOOKUP = 3;
+    private final int NUM_CONCURRENT_INBOUND_CONNECTION = 2;
+    private ProxyService proxyService;
+    private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @Override
+    @BeforeClass
+    protected void setup() throws Exception {
+        internalSetup();
+
+        proxyConfig.setServicePort(PortManager.nextFreePort());
+        proxyConfig.setZookeeperServers(DUMMY_VALUE);
+        proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE);
+        proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP);
+        proxyConfig.setMaxConcurrentInboundConnections(NUM_CONCURRENT_INBOUND_CONNECTION);
+        proxyService = Mockito.spy(new ProxyService(proxyConfig));
+        doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
+
+        proxyService.start();
+    }
+
+    @Override
+    @AfterClass
+    protected void cleanup() throws Exception {
+        internalCleanup();
+        proxyService.close();
+    }
+
+    @Test
+    public void testInboundConnection() throws Exception {
+        LOG.info("Creating producer 1");
+        PulsarClient client1 = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort())
+                .build();
+        Producer<byte[]> producer1 = client1.newProducer().topic("persistent://sample/test/local/producer-topic-1").create();
+        
+        LOG.info("Creating producer 2");
+        PulsarClient client2 = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort())
+                .build();
+        Producer<byte[]> producer2;
+        try {
+            producer2 = client2.newProducer().topic("persistent://sample/test/local/producer-topic-1").create();
+            producer2.send("Message 1".getBytes());
+            Assert.fail("Should have failed since max num of connections is 2 and the first producer used them all up - one for discovery and other for producing.");
+        } catch (Exception ex) {
+            // OK
+        }
+    }
+    
+    private static final Logger LOG = LoggerFactory.getLogger(ProxyConnectionThrottlingTest.class);
+}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
new file mode 100644
index 0000000..c661cae
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.doReturn;
+import static org.testng.Assert.assertTrue;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest {
+
+    private final String DUMMY_VALUE = "DUMMY_VALUE";
+    private final int NUM_CONCURRENT_LOOKUP = 3;
+    private final int NUM_CONCURRENT_INBOUND_CONNECTION = 5;
+    private ProxyService proxyService;
+    private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @Override
+    @BeforeClass
+    protected void setup() throws Exception {
+        internalSetup();
+
+        proxyConfig.setServicePort(PortManager.nextFreePort());
+        proxyConfig.setZookeeperServers(DUMMY_VALUE);
+        proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE);
+        proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP);
+        proxyConfig.setMaxConcurrentInboundConnections(NUM_CONCURRENT_INBOUND_CONNECTION);
+        proxyService = Mockito.spy(new ProxyService(proxyConfig));
+        doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
+
+        proxyService.start();
+    }
+
+    @Override
+    @AfterClass
+    protected void cleanup() throws Exception {
+        internalCleanup();
+        proxyService.close();
+    }
+
+    @Test
+    public void testLookup() throws Exception {
+        PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort())
+                .connectionsPerBroker(5).ioThreads(5).build();
+        assertTrue(proxyService.getLookupRequestSemaphore().tryAcquire());
+        assertTrue(proxyService.getLookupRequestSemaphore().tryAcquire());
+        Producer<byte[]> producer1 = client.newProducer().topic("persistent://sample/test/local/producer-topic")
+                .create();
+        assertTrue(proxyService.getLookupRequestSemaphore().tryAcquire());
+        try {
+            Producer<byte[]> producer2 = client.newProducer().topic("persistent://sample/test/local/producer-topic")
+                    .create();
+            Assert.fail("Should have failed since can't acquire LookupRequestSemaphore");
+        } catch (Exception ex) {
+            // Ignore
+        }
+
+        proxyService.getLookupRequestSemaphore().release();
+        try {
+            Producer<byte[]> producer3 = client.newProducer().topic("persistent://sample/test/local/producer-topic")
+                    .create();
+        } catch (Exception ex) {
+            Assert.fail("Should not have failed since can acquire LookupRequestSemaphore");
+        }
+        client.close();
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
jai1@apache.org.