You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/04/17 16:36:57 UTC
[incubator-pulsar] branch master updated: Add rate limit for client
lookup requests (#1538)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f5a28f4 Add rate limit for client lookup requests (#1538)
f5a28f4 is described below
commit f5a28f4ff560e078d8cde0b76011da0eb5ab39f7
Author: Jia Zhai <zh...@gmail.com>
AuthorDate: Tue Apr 17 09:36:55 2018 -0700
Add rate limit for client lookup requests (#1538)
* add rate limit for client lookup request
* change following comments
---
.../pulsar/broker/service/BrokerServiceTest.java | 33 ++++++++--
.../apache/pulsar/client/api/ClientBuilder.java | 12 +++-
.../pulsar/client/impl/ClientBuilderImpl.java | 6 ++
.../org/apache/pulsar/client/impl/ClientCnx.java | 76 +++++++++++++++++-----
.../client/impl/conf/ClientConfigurationData.java | 3 +-
5 files changed, 105 insertions(+), 25 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index c375635..26aa09a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -698,13 +698,34 @@ public class BrokerServiceTest extends BrokerTestBase {
String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl).statsInterval(0, TimeUnit.SECONDS)
- .maxConcurrentLookupRequests(0).build();
+ .maxConcurrentLookupRequests(1).maxLookupRequests(2).build();
+ // 2 lookup will success.
try {
- pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub").subscribe();
- fail("It should fail as throttling should not receive any request");
- } catch (org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException e) {
- // ok as throttling set to 0
+ CompletableFuture<Consumer<byte[]>> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub1").subscribeAsync();
+ CompletableFuture<Consumer<byte[]>> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub2").subscribeAsync();
+
+ consumer1.get().close();
+ consumer2.get().close();
+ } catch (Exception e) {
+ fail("Subscribe should success with 2 requests");
+ }
+
+ // 3 lookup will fail
+ try {
+ CompletableFuture<Consumer<byte[]>> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub11").subscribeAsync();
+ CompletableFuture<Consumer<byte[]>> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub22").subscribeAsync();
+ CompletableFuture<Consumer<byte[]>> consumer3 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub33").subscribeAsync();
+
+ consumer1.get().close();
+ consumer2.get().close();
+ consumer3.get().close();
+ fail("It should fail as throttling should only receive 2 requests");
+ } catch (Exception e) {
+ if (!(e.getCause() instanceof
+ org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException)) {
+ fail("Subscribe should fail with TooManyRequestsException");
+ }
}
}
@@ -858,4 +879,4 @@ public class BrokerServiceTest extends BrokerTestBase {
assertEquals(policy.get().bundles.numBundles, totalBundle);
}
-}
\ No newline at end of file
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index 418953d..071e292 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -246,7 +246,7 @@ public interface ClientBuilder extends Serializable, Cloneable {
ClientBuilder statsInterval(long statsInterval, TimeUnit unit);
/**
- * Number of concurrent lookup-requests allowed on each broker-connection to prevent overload on broker.
+ * Number of concurrent lookup-requests allowed to send on each broker-connection to prevent overload on broker.
* <i>(default: 5000)</i> It should be configured with higher value only in case of it requires to produce/subscribe
* on thousands of topic using created {@link PulsarClient}
*
@@ -255,6 +255,16 @@ public interface ClientBuilder extends Serializable, Cloneable {
ClientBuilder maxConcurrentLookupRequests(int maxConcurrentLookupRequests);
/**
+ * Number of max lookup-requests allowed on each broker-connection to prevent overload on broker.
+ * <i>(default: 50000)</i> It should be bigger than maxConcurrentLookupRequests.
+ * Requests that inside maxConcurrentLookupRequests already send to broker, and requests beyond
+ * maxConcurrentLookupRequests and under maxLookupRequests will wait in each client cnx.
+ *
+ * @param maxLookupRequests
+ */
+ ClientBuilder maxLookupRequests(int maxLookupRequests);
+
+ /**
* Set max number of broker-rejected requests in a certain time-frame (30 seconds) after which current connection
* will be closed and client creates a new connection that give chance to connect a different broker <i>(default:
* 50)</i>
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index deb0625..ad4b510 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -150,6 +150,12 @@ public class ClientBuilderImpl implements ClientBuilder {
}
@Override
+ public ClientBuilder maxLookupRequests(int maxLookupRequests) {
+ conf.setMaxLookupRequest(maxLookupRequests);
+ return this;
+ }
+
+ @Override
public ClientBuilder maxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRequestPerConnection) {
conf.setMaxNumberOfRejectedRequestPerConnection(maxNumberOfRejectedRequestPerConnection);
return this;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 39aac44..038e6ae 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.client.impl.HttpClient.getPulsarClientVersion;
+import com.google.common.collect.Queues;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
@@ -33,11 +34,14 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.List;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.net.ssl.SSLSession;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -76,6 +80,8 @@ public class ClientCnx extends PulsarHandler {
new ConcurrentLongHashMap<>(16, 1);
private final ConcurrentLongHashMap<CompletableFuture<LookupDataResult>> pendingLookupRequests =
new ConcurrentLongHashMap<>(16, 1);
+ // LookupRequests that waiting in client side.
+ private final BlockingQueue<Pair<Long, Pair<ByteBuf, CompletableFuture<LookupDataResult>>>> waitingLookupRequests;
private final ConcurrentLongHashMap<CompletableFuture<MessageIdData>> pendingGetLastMessageIdRequests =
new ConcurrentLongHashMap<>(16, 1);
private final ConcurrentLongHashMap<CompletableFuture<List<String>>> pendingGetTopicsRequests =
@@ -108,7 +114,10 @@ public class ClientCnx extends PulsarHandler {
public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
super(conf.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
+ checkArgument(conf.getMaxLookupRequest() > conf.getConcurrentLookupRequest());
this.pendingLookupRequestSemaphore = new Semaphore(conf.getConcurrentLookupRequest(), true);
+ this.waitingLookupRequests = Queues
+ .newArrayBlockingQueue((conf.getMaxLookupRequest() - conf.getConcurrentLookupRequest()));
this.authentication = conf.getAuthentication();
this.eventLoopGroup = eventLoopGroup;
this.maxNumberOfRejectedRequestPerConnection = conf.getMaxNumberOfRejectedRequestPerConnection();
@@ -163,12 +172,22 @@ public class ClientCnx extends PulsarHandler {
// Fail out all the pending ops
pendingRequests.forEach((key, future) -> future.completeExceptionally(e));
pendingLookupRequests.forEach((key, future) -> future.completeExceptionally(e));
+ waitingLookupRequests.forEach(pair -> pair.getRight().getRight().completeExceptionally(e));
pendingGetLastMessageIdRequests.forEach((key, future) -> future.completeExceptionally(e));
pendingGetTopicsRequests.forEach((key, future) -> future.completeExceptionally(e));
// Notify all attached producers/consumers so they have a chance to reconnect
producers.forEach((id, producer) -> producer.connectionClosed(this));
consumers.forEach((id, consumer) -> consumer.connectionClosed(this));
+
+ pendingRequests.clear();
+ pendingLookupRequests.clear();
+ waitingLookupRequests.clear();
+ pendingGetLastMessageIdRequests.clear();
+ pendingGetTopicsRequests.clear();
+
+ producers.clear();
+ consumers.clear();
}
// Command Handlers
@@ -399,24 +418,39 @@ public class ClientCnx extends PulsarHandler {
}
}
- private boolean addPendingLookupRequests(long requestId, CompletableFuture<LookupDataResult> future) {
- if (pendingLookupRequestSemaphore.tryAcquire()) {
- pendingLookupRequests.put(requestId, future);
- eventLoopGroup.schedule(() -> {
- if (!future.isDone()) {
- future.completeExceptionally(new TimeoutException(
- requestId + " lookup request timedout after ms " + operationTimeoutMs));
- }
- }, operationTimeoutMs, TimeUnit.MILLISECONDS);
- return true;
- }
- return false;
+ // caller of this method needs to be protected under pendingLookupRequestSemaphore
+ private void addPendingLookupRequests(long requestId, CompletableFuture<LookupDataResult> future) {
+ pendingLookupRequests.put(requestId, future);
+ eventLoopGroup.schedule(() -> {
+ if (!future.isDone()) {
+ future.completeExceptionally(new TimeoutException(
+ requestId + " lookup request timedout after ms " + operationTimeoutMs));
+ }
+ }, operationTimeoutMs, TimeUnit.MILLISECONDS);
}
private CompletableFuture<LookupDataResult> getAndRemovePendingLookupRequest(long requestId) {
CompletableFuture<LookupDataResult> result = pendingLookupRequests.remove(requestId);
if (result != null) {
- pendingLookupRequestSemaphore.release();
+ Pair<Long, Pair<ByteBuf, CompletableFuture<LookupDataResult>>> firstOneWaiting = waitingLookupRequests.poll();
+ if (firstOneWaiting != null) {
+ // schedule a new lookup in.
+ eventLoopGroup.submit(() -> {
+ long newId = firstOneWaiting.getLeft();
+ CompletableFuture<LookupDataResult> newFuture = firstOneWaiting.getRight().getRight();
+ addPendingLookupRequests(newId, newFuture);
+ ctx.writeAndFlush(firstOneWaiting.getRight().getLeft()).addListener(writeFuture -> {
+ if (!writeFuture.isSuccess()) {
+ log.warn("{} Failed to send request {} to broker: {}", ctx.channel(), newId,
+ writeFuture.cause().getMessage());
+ getAndRemovePendingLookupRequest(newId);
+ newFuture.completeExceptionally(writeFuture.cause());
+ }
+ });
+ });
+ } else {
+ pendingLookupRequestSemaphore.release();
+ }
}
return result;
}
@@ -495,11 +529,12 @@ public class ClientCnx extends PulsarHandler {
public CompletableFuture<LookupDataResult> newLookup(ByteBuf request, long requestId) {
CompletableFuture<LookupDataResult> future = new CompletableFuture<>();
- if (addPendingLookupRequests(requestId, future)) {
+ if (pendingLookupRequestSemaphore.tryAcquire()) {
+ addPendingLookupRequests(requestId, future);
ctx.writeAndFlush(request).addListener(writeFuture -> {
if (!writeFuture.isSuccess()) {
log.warn("{} Failed to send request {} to broker: {}", ctx.channel(), requestId,
- writeFuture.cause().getMessage());
+ writeFuture.cause().getMessage());
getAndRemovePendingLookupRequest(requestId);
future.completeExceptionally(writeFuture.cause());
}
@@ -508,8 +543,15 @@ public class ClientCnx extends PulsarHandler {
if (log.isDebugEnabled()) {
log.debug("{} Failed to add lookup-request into pending queue", requestId);
}
- future.completeExceptionally(new PulsarClientException.TooManyRequestsException(
- "Failed due to too many pending lookup requests"));
+ if (!waitingLookupRequests.offer(Pair.of(requestId, Pair.of(request, future)))) {
+ if (log.isDebugEnabled()) {
+ log.debug("{} Failed to add lookup-request into waiting queue", requestId);
+ }
+ future.completeExceptionally(new PulsarClientException.TooManyRequestsException(String.format(
+ "Requests number out of config: There are {%s} lookup requests outstanding and {%s} requests pending.",
+ pendingLookupRequests.size(),
+ waitingLookupRequests.size())));
+ }
}
return future;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 859bb1b..1d3280b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -51,7 +51,8 @@ public class ClientConfigurationData implements Serializable, Cloneable {
private String tlsTrustCertsFilePath = "";
private boolean tlsAllowInsecureConnection = false;
private boolean tlsHostnameVerificationEnable = false;
- private int concurrentLookupRequest = 50000;
+ private int concurrentLookupRequest = 5000;
+ private int maxLookupRequest = 50000;
private int maxNumberOfRejectedRequestPerConnection = 50;
private int keepAliveIntervalSeconds = 30;
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.