You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ja...@apache.org on 2018/03/29 17:37:09 UTC
[incubator-pulsar] branch master updated: Added Throttling
mechanism to Pulsar Proxy (#1453)
This is an automated email from the ASF dual-hosted git repository.
jai1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 56e0109 Added Throttling mechanism to Pulsar Proxy (#1453)
56e0109 is described below
commit 56e0109657e3b53690d9c91bfe7a231474a36895
Author: Jai Asher <ja...@ccs.neu.edu>
AuthorDate: Thu Mar 29 10:37:05 2018 -0700
Added Throttling mechanism to Pulsar Proxy (#1453)
---
conf/proxy.conf | 7 ++
.../pulsar/proxy/server/DirectProxyHandler.java | 2 -
.../pulsar/proxy/server/LookupProxyHandler.java | 125 ++++++++++++---------
.../pulsar/proxy/server/ProxyConfiguration.java | 24 +++-
.../pulsar/proxy/server/ProxyConnection.java | 28 ++++-
.../apache/pulsar/proxy/server/ProxyService.java | 13 ++-
.../server/ProxyConnectionThrottlingTest.java | 87 ++++++++++++++
.../proxy/server/ProxyLookupThrottlingTest.java | 91 +++++++++++++++
8 files changed, 313 insertions(+), 64 deletions(-)
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 5d0647d..f731240 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -69,6 +69,13 @@ superUserRoles=
# make sure authentication is enabled for this to take effect
forwardAuthorizationCredentials=false
+# --- RateLimiting ----
+# Max concurrent inbound Connections, proxy will reject requests beyond that. Default value is 10,000
+maxConcurrentInboundConnections=10000
+
+# Max concurrent outbound Connections, proxy will error out requests beyond that. Default value is 10,000
+maxConcurrentLookupRequests=10000
+
##### --- TLS --- #####
# Enable TLS in the proxy
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 92ff107..8b224f6 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -58,7 +58,6 @@ public class DirectProxyHandler {
private String originalPrincipal;
private String clientAuthData;
private String clientAuthMethod;
- private boolean forwardAuthData;
public static final String TLS_HANDLER = "tls";
private final Authentication authentication;
@@ -70,7 +69,6 @@ public class DirectProxyHandler {
this.clientAuthData = proxyConnection.clientAuthData;
this.clientAuthMethod = proxyConnection.clientAuthMethod;
ProxyConfiguration config = service.getConfiguration();
- this.forwardAuthData = service.getConfiguration().forwardAuthorizationCredentials();
// Start the connection attempt.
Bootstrap b = new Bootstrap();
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index 6da5a89..aad42df 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -65,28 +65,37 @@ public class LookupProxyHandler {
if (log.isDebugEnabled()) {
log.debug("Received Lookup from {}", clientAddress);
}
-
- lookupRequests.inc();
long clientRequestId = lookup.getRequestId();
- String topic = lookup.getTopic();
- String serviceUrl;
- if (isBlank(brokerServiceURL)) {
- ServiceLookupData availableBroker = null;
- try {
- availableBroker = service.getDiscoveryProvider().nextBroker();
- } catch (Exception e) {
- log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e);
- proxyConnection.ctx().writeAndFlush(
- Commands.newLookupErrorResponse(ServerError.ServiceNotReady, e.getMessage(), clientRequestId));
- return;
+ if (this.service.getLookupRequestSemaphore().tryAcquire()) {
+ lookupRequests.inc();
+ String topic = lookup.getTopic();
+ String serviceUrl;
+ if (isBlank(brokerServiceURL)) {
+ ServiceLookupData availableBroker = null;
+ try {
+ availableBroker = service.getDiscoveryProvider().nextBroker();
+ } catch (Exception e) {
+ log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e);
+ proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
+ e.getMessage(), clientRequestId));
+ return;
+ }
+ serviceUrl = this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls()
+ : availableBroker.getPulsarServiceUrl();
+ } else {
+ serviceUrl = this.connectWithTLS ? service.getConfiguration().getBrokerServiceURLTLS()
+ : service.getConfiguration().getBrokerServiceURL();
}
- serviceUrl = this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls()
- : availableBroker.getPulsarServiceUrl();
+ performLookup(clientRequestId, topic, serviceUrl, false, 10);
+ this.service.getLookupRequestSemaphore().release();
} else {
- serviceUrl = this.connectWithTLS ? service.getConfiguration().getBrokerServiceURLTLS()
- : service.getConfiguration().getBrokerServiceURL();
+ if (log.isDebugEnabled()) {
+ log.debug("Request ID {} from {} rejected - Too many concurrent lookup requests.", clientRequestId, clientAddress);
+ }
+ proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
+ "Too many concurrent lookup requests", clientRequestId));
}
- performLookup(clientRequestId, topic, serviceUrl, false, 10);
+
}
private void performLookup(long clientRequestId, String topic, String brokerServiceUrl, boolean authoritative,
@@ -121,27 +130,26 @@ public class LookupProxyHandler {
} else {
command = Commands.newLookup(topic, authoritative, requestId);
}
- clientCnx.newLookup(command,
- requestId).thenAccept(result -> {
- String brokerUrl = connectWithTLS ? result.brokerUrlTls : result.brokerUrl;
- if (result.redirect) {
- // Need to try the lookup again on a different broker
- performLookup(clientRequestId, topic, brokerUrl, result.authoritative, numberOfRetries - 1);
- } else {
- // Reply the same address for both TLS non-TLS. The reason is that whether we use TLS
- // between proxy
- // and broker is independent of whether the client itself uses TLS, but we need to force the
- // client
- // to use the appropriate target broker (and port) when it will connect back.
- proxyConnection.ctx().writeAndFlush(Commands.newLookupResponse(brokerUrl, brokerUrl, true,
- LookupType.Connect, clientRequestId, true /* this is coming from proxy */));
- }
- }).exceptionally(ex -> {
- log.warn("[{}] Failed to lookup topic {}: {}", clientAddress, topic, ex.getMessage());
- proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
- ex.getMessage(), clientRequestId));
- return null;
- });
+ clientCnx.newLookup(command, requestId).thenAccept(result -> {
+ String brokerUrl = connectWithTLS ? result.brokerUrlTls : result.brokerUrl;
+ if (result.redirect) {
+ // Need to try the lookup again on a different broker
+ performLookup(clientRequestId, topic, brokerUrl, result.authoritative, numberOfRetries - 1);
+ } else {
+ // Reply the same address for both TLS non-TLS. The reason is that whether we use TLS
+ // between proxy
+ // and broker is independent of whether the client itself uses TLS, but we need to force the
+ // client
+ // to use the appropriate target broker (and port) when it will connect back.
+ proxyConnection.ctx().writeAndFlush(Commands.newLookupResponse(brokerUrl, brokerUrl, true,
+ LookupType.Connect, clientRequestId, true /* this is coming from proxy */));
+ }
+ }).exceptionally(ex -> {
+ log.warn("[{}] Failed to lookup topic {}: {}", clientAddress, topic, ex.getMessage());
+ proxyConnection.ctx().writeAndFlush(
+ Commands.newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), clientRequestId));
+ return null;
+ });
}).exceptionally(ex -> {
// Failed to connect to backend broker
proxyConnection.ctx().writeAndFlush(
@@ -155,13 +163,22 @@ public class LookupProxyHandler {
if (log.isDebugEnabled()) {
log.debug("[{}] Received PartitionMetadataLookup", clientAddress);
}
-
final long clientRequestId = partitionMetadata.getRequestId();
+ if (this.service.getLookupRequestSemaphore().tryAcquire()) {
+ handlePartitionMetadataResponse(partitionMetadata, clientRequestId);
+ this.service.getLookupRequestSemaphore().release();
+ } else {
+ proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
+ "Too many concurrent lookup requests", clientRequestId));
+ }
+ }
+
+ private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata,
+ long clientRequestId) {
TopicName topicName = TopicName.get(partitionMetadata.getTopic());
if (isBlank(brokerServiceURL)) {
- service.getDiscoveryProvider().getPartitionedTopicMetadata(service, topicName, proxyConnection.clientAuthRole,
- proxyConnection.authenticationData)
- .thenAccept(metadata -> {
+ service.getDiscoveryProvider().getPartitionedTopicMetadata(service, topicName,
+ proxyConnection.clientAuthRole, proxyConnection.authenticationData).thenAccept(metadata -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Total number of partitions for topic {} is {}",
proxyConnection.clientAuthRole, topicName, metadata.partitions);
@@ -202,18 +219,16 @@ public class LookupProxyHandler {
} else {
command = Commands.newPartitionMetadataRequest(topicName.toString(), requestId);
}
- clientCnx.newLookup(
- command,
- requestId).thenAccept(lookupDataResult -> {
- proxyConnection.ctx().writeAndFlush(Commands
- .newPartitionMetadataResponse(lookupDataResult.partitions, clientRequestId));
- }).exceptionally((ex) -> {
- log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(),
- ex.getCause().getMessage(), ex);
- proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(
- ServerError.ServiceNotReady, ex.getMessage(), clientRequestId));
- return null;
- });
+ clientCnx.newLookup(command, requestId).thenAccept(lookupDataResult -> {
+ proxyConnection.ctx().writeAndFlush(
+ Commands.newPartitionMetadataResponse(lookupDataResult.partitions, clientRequestId));
+ }).exceptionally((ex) -> {
+ log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(),
+ ex.getCause().getMessage(), ex);
+ proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
+ ex.getMessage(), clientRequestId));
+ return null;
+ });
}).exceptionally(ex -> {
// Failed to connect to backend broker
proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index a8d3855..43b8d56 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -70,6 +70,12 @@ public class ProxyConfiguration implements PulsarConfiguration {
// make sure authentication is enabled for this to take effect
private boolean forwardAuthorizationCredentials = false;
+ // Max concurrent inbound Connections
+ private int maxConcurrentInboundConnections = 10000;
+
+ // Max concurrent outbound Connections
+ private int maxConcurrentLookupRequests = 10000;
+
// Authentication settings of the proxy itself. Used to connect to brokers
private String brokerClientAuthenticationPlugin;
private String brokerClientAuthenticationParameters;
@@ -335,7 +341,23 @@ public class ProxyConfiguration implements PulsarConfiguration {
public void setTlsCiphers(Set<String> tlsCiphers) {
this.tlsCiphers = tlsCiphers;
}
-
+
+ public int getMaxConcurrentInboundConnections() {
+ return maxConcurrentInboundConnections;
+ }
+
+ public void setMaxConcurrentInboundConnections(int maxConcurrentInboundConnections) {
+ this.maxConcurrentInboundConnections = maxConcurrentInboundConnections;
+ }
+
+ public int getMaxConcurrentLookupRequests() {
+ return maxConcurrentLookupRequests;
+ }
+
+ public void setMaxConcurrentLookupRequests(int maxConcurrentLookupRequests) {
+ this.maxConcurrentLookupRequests = maxConcurrentLookupRequests;
+ }
+
public boolean getTlsRequireTrustedClientCertOnConnect() {
return tlsRequireTrustedClientCertOnConnect;
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 89d737d..e65461a 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -40,6 +40,8 @@ import org.slf4j.LoggerFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandler;
+import io.netty.channel.ChannelPipeline;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@@ -92,9 +94,27 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
}
@Override
+ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+ super.channelRegistered(ctx);
+ activeConnections.inc();
+ if (activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[{}] Too many connection opened {}", remoteAddress, activeConnections.get());
+ }
+ ctx.close();
+ return;
+ }
+ }
+
+ @Override
+ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+ super.channelUnregistered(ctx);
+ activeConnections.dec();
+ }
+
+ @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
- activeConnections.inc();
newConnections.inc();
LOG.info("[{}] New connection opened", remoteAddress);
}
@@ -102,7 +122,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
- activeConnections.dec();
if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
directProxyHandler.outboundChannel.close();
@@ -164,7 +183,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
close();
return;
}
-
+
if (connect.hasProxyToBrokerUrl()) {
// Client already knows which broker to connect. Let's open a connection
// there and just pass bytes in both directions
@@ -226,8 +245,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
sslSession = ((SslHandler) sslHandler).engine().getSession();
}
authenticationData = new AuthenticationDataCommand(authData, remoteAddress, sslSession);
- clientAuthRole = service.getAuthenticationService()
- .authenticate(authenticationData, authMethod);
+ clientAuthRole = service.getAuthenticationService().authenticate(authenticationData, authMethod);
LOG.info("[{}] Client successfully authenticated with {} role {}", remoteAddress, authMethod,
clientAuthRole);
return true;
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 4337442..c77ace2 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -25,6 +25,8 @@ import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
@@ -78,12 +80,17 @@ public class ProxyService implements Closeable {
private LocalZooKeeperConnectionService localZooKeeperConnectionService;
+ protected final AtomicReference<Semaphore> lookupRequestSemaphore;
+
private static final int numThreads = Runtime.getRuntime().availableProcessors();
public ProxyService(ProxyConfiguration proxyConfig) throws IOException {
checkNotNull(proxyConfig);
this.proxyConfig = proxyConfig;
+ this.lookupRequestSemaphore = new AtomicReference<Semaphore>(
+ new Semaphore(proxyConfig.getMaxConcurrentLookupRequests(), false));
+
String hostname;
try {
hostname = InetAddress.getLocalHost().getHostName();
@@ -93,7 +100,7 @@ public class ProxyService implements Closeable {
this.serviceUrl = String.format("pulsar://%s:%d/", hostname, proxyConfig.getServicePort());
this.serviceUrlTls = String.format("pulsar://%s:%d/", hostname, proxyConfig.getServicePortTls());
- this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1, acceptorThreadFactory);
+ this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1, acceptorThreadFactory);
this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, workersThreadFactory);
ClientConfigurationData clientConf = new ClientConfigurationData();
@@ -218,5 +225,9 @@ public class ProxyService implements Closeable {
this.configurationCacheService = configurationCacheService;
}
+ public Semaphore getLookupRequestSemaphore() {
+ return lookupRequestSemaphore.get();
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(ProxyService.class);
}
\ No newline at end of file
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
new file mode 100644
index 0000000..008e751
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.doReturn;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest {
+
+ private final String DUMMY_VALUE = "DUMMY_VALUE";
+ private final int NUM_CONCURRENT_LOOKUP = 3;
+ private final int NUM_CONCURRENT_INBOUND_CONNECTION = 2;
+ private ProxyService proxyService;
+ private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+ @Override
+ @BeforeClass
+ protected void setup() throws Exception {
+ internalSetup();
+
+ proxyConfig.setServicePort(PortManager.nextFreePort());
+ proxyConfig.setZookeeperServers(DUMMY_VALUE);
+ proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE);
+ proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP);
+ proxyConfig.setMaxConcurrentInboundConnections(NUM_CONCURRENT_INBOUND_CONNECTION);
+ proxyService = Mockito.spy(new ProxyService(proxyConfig));
+ doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
+
+ proxyService.start();
+ }
+
+ @Override
+ @AfterClass
+ protected void cleanup() throws Exception {
+ internalCleanup();
+ proxyService.close();
+ }
+
+ @Test
+ public void testInboundConnection() throws Exception {
+ LOG.info("Creating producer 1");
+ PulsarClient client1 = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort())
+ .build();
+ Producer<byte[]> producer1 = client1.newProducer().topic("persistent://sample/test/local/producer-topic-1").create();
+
+ LOG.info("Creating producer 2");
+ PulsarClient client2 = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort())
+ .build();
+ Producer<byte[]> producer2;
+ try {
+ producer2 = client2.newProducer().topic("persistent://sample/test/local/producer-topic-1").create();
+ producer2.send("Message 1".getBytes());
+ Assert.fail("Should have failed since max num of connections is 2 and the first producer used them all up - one for discovery and other for producing.");
+ } catch (Exception ex) {
+ // OK
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(ProxyConnectionThrottlingTest.class);
+}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
new file mode 100644
index 0000000..c661cae
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.doReturn;
+import static org.testng.Assert.assertTrue;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest {
+
+ private final String DUMMY_VALUE = "DUMMY_VALUE";
+ private final int NUM_CONCURRENT_LOOKUP = 3;
+ private final int NUM_CONCURRENT_INBOUND_CONNECTION = 5;
+ private ProxyService proxyService;
+ private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+ @Override
+ @BeforeClass
+ protected void setup() throws Exception {
+ internalSetup();
+
+ proxyConfig.setServicePort(PortManager.nextFreePort());
+ proxyConfig.setZookeeperServers(DUMMY_VALUE);
+ proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE);
+ proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP);
+ proxyConfig.setMaxConcurrentInboundConnections(NUM_CONCURRENT_INBOUND_CONNECTION);
+ proxyService = Mockito.spy(new ProxyService(proxyConfig));
+ doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
+
+ proxyService.start();
+ }
+
+ @Override
+ @AfterClass
+ protected void cleanup() throws Exception {
+ internalCleanup();
+ proxyService.close();
+ }
+
+ @Test
+ public void testLookup() throws Exception {
+ PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort())
+ .connectionsPerBroker(5).ioThreads(5).build();
+ assertTrue(proxyService.getLookupRequestSemaphore().tryAcquire());
+ assertTrue(proxyService.getLookupRequestSemaphore().tryAcquire());
+ Producer<byte[]> producer1 = client.newProducer().topic("persistent://sample/test/local/producer-topic")
+ .create();
+ assertTrue(proxyService.getLookupRequestSemaphore().tryAcquire());
+ try {
+ Producer<byte[]> producer2 = client.newProducer().topic("persistent://sample/test/local/producer-topic")
+ .create();
+ Assert.fail("Should have failed since can't acquire LookupRequestSemaphore");
+ } catch (Exception ex) {
+ // Ignore
+ }
+
+ proxyService.getLookupRequestSemaphore().release();
+ try {
+ Producer<byte[]> producer3 = client.newProducer().topic("persistent://sample/test/local/producer-topic")
+ .create();
+ } catch (Exception ex) {
+ Assert.fail("Should not have failed since can acquire LookupRequestSemaphore");
+ }
+ client.close();
+ }
+}
--
To stop receiving notification emails like this one, please contact
jai1@apache.org.