You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/04/17 16:36:57 UTC

[incubator-pulsar] branch master updated: Add rate limit for client lookup requests (#1538)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f5a28f4  Add rate limit for client lookup requests (#1538)
f5a28f4 is described below

commit f5a28f4ff560e078d8cde0b76011da0eb5ab39f7
Author: Jia Zhai <zh...@gmail.com>
AuthorDate: Tue Apr 17 09:36:55 2018 -0700

    Add rate limit for client lookup requests (#1538)
    
    * add rate limit for client lookup request
    
    * change following comments
---
 .../pulsar/broker/service/BrokerServiceTest.java   | 33 ++++++++--
 .../apache/pulsar/client/api/ClientBuilder.java    | 12 +++-
 .../pulsar/client/impl/ClientBuilderImpl.java      |  6 ++
 .../org/apache/pulsar/client/impl/ClientCnx.java   | 76 +++++++++++++++++-----
 .../client/impl/conf/ClientConfigurationData.java  |  3 +-
 5 files changed, 105 insertions(+), 25 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index c375635..26aa09a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -698,13 +698,34 @@ public class BrokerServiceTest extends BrokerTestBase {
 
         String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
         PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl).statsInterval(0, TimeUnit.SECONDS)
-                .maxConcurrentLookupRequests(0).build();
+                .maxConcurrentLookupRequests(1).maxLookupRequests(2).build();
 
+        // 2 lookup will success.
         try {
-            pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub").subscribe();
-            fail("It should fail as throttling should not receive any request");
-        } catch (org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException e) {
-            // ok as throttling set to 0
+            CompletableFuture<Consumer<byte[]>> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub1").subscribeAsync();
+            CompletableFuture<Consumer<byte[]>> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub2").subscribeAsync();
+
+            consumer1.get().close();
+            consumer2.get().close();
+        } catch (Exception e) {
+            fail("Subscribe should success with 2 requests");
+        }
+
+        // 3 lookup will fail
+        try {
+            CompletableFuture<Consumer<byte[]>> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub11").subscribeAsync();
+            CompletableFuture<Consumer<byte[]>> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub22").subscribeAsync();
+            CompletableFuture<Consumer<byte[]>> consumer3 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub33").subscribeAsync();
+
+            consumer1.get().close();
+            consumer2.get().close();
+            consumer3.get().close();
+            fail("It should fail as throttling should only receive 2 requests");
+        } catch (Exception e) {
+            if (!(e.getCause() instanceof
+                org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException)) {
+                fail("Subscribe should fail with TooManyRequestsException");
+            }
         }
     }
 
@@ -858,4 +879,4 @@ public class BrokerServiceTest extends BrokerTestBase {
         assertEquals(policy.get().bundles.numBundles, totalBundle);
     }
 
-}
\ No newline at end of file
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index 418953d..071e292 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -246,7 +246,7 @@ public interface ClientBuilder extends Serializable, Cloneable {
     ClientBuilder statsInterval(long statsInterval, TimeUnit unit);
 
     /**
-     * Number of concurrent lookup-requests allowed on each broker-connection to prevent overload on broker.
+     * Number of concurrent lookup-requests allowed to send on each broker-connection to prevent overload on broker.
      * <i>(default: 5000)</i> It should be configured with higher value only in case of it requires to produce/subscribe
      * on thousands of topic using created {@link PulsarClient}
      *
@@ -255,6 +255,16 @@ public interface ClientBuilder extends Serializable, Cloneable {
     ClientBuilder maxConcurrentLookupRequests(int maxConcurrentLookupRequests);
 
     /**
+     * Number of max lookup-requests allowed on each broker-connection to prevent overload on broker.
+     * <i>(default: 50000)</i> It should be bigger than maxConcurrentLookupRequests.
+     * Requests that inside maxConcurrentLookupRequests already send to broker, and requests beyond
+     * maxConcurrentLookupRequests and under maxLookupRequests will wait in each client cnx.
+     *
+     * @param maxLookupRequests
+     */
+    ClientBuilder maxLookupRequests(int maxLookupRequests);
+
+    /**
      * Set max number of broker-rejected requests in a certain time-frame (30 seconds) after which current connection
      * will be closed and client creates a new connection that give chance to connect a different broker <i>(default:
      * 50)</i>
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index deb0625..ad4b510 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -150,6 +150,12 @@ public class ClientBuilderImpl implements ClientBuilder {
     }
 
     @Override
+    public ClientBuilder maxLookupRequests(int maxLookupRequests) {
+        conf.setMaxLookupRequest(maxLookupRequests);
+        return this;
+    }
+
+    @Override
     public ClientBuilder maxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRequestPerConnection) {
         conf.setMaxNumberOfRejectedRequestPerConnection(maxNumberOfRejectedRequestPerConnection);
         return this;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 39aac44..038e6ae 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.pulsar.client.impl.HttpClient.getPulsarClientVersion;
 
+import com.google.common.collect.Queues;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
@@ -33,11 +34,14 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.channels.ClosedChannelException;
 import java.util.List;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import javax.net.ssl.SSLSession;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.http.conn.ssl.DefaultHostnameVerifier;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -76,6 +80,8 @@ public class ClientCnx extends PulsarHandler {
         new ConcurrentLongHashMap<>(16, 1);
     private final ConcurrentLongHashMap<CompletableFuture<LookupDataResult>> pendingLookupRequests =
         new ConcurrentLongHashMap<>(16, 1);
+    // LookupRequests that waiting in client side.
+    private final BlockingQueue<Pair<Long, Pair<ByteBuf, CompletableFuture<LookupDataResult>>>> waitingLookupRequests;
     private final ConcurrentLongHashMap<CompletableFuture<MessageIdData>> pendingGetLastMessageIdRequests =
         new ConcurrentLongHashMap<>(16, 1);
     private final ConcurrentLongHashMap<CompletableFuture<List<String>>> pendingGetTopicsRequests =
@@ -108,7 +114,10 @@ public class ClientCnx extends PulsarHandler {
 
     public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
         super(conf.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
+        checkArgument(conf.getMaxLookupRequest() > conf.getConcurrentLookupRequest());
         this.pendingLookupRequestSemaphore = new Semaphore(conf.getConcurrentLookupRequest(), true);
+        this.waitingLookupRequests = Queues
+            .newArrayBlockingQueue((conf.getMaxLookupRequest() - conf.getConcurrentLookupRequest()));
         this.authentication = conf.getAuthentication();
         this.eventLoopGroup = eventLoopGroup;
         this.maxNumberOfRejectedRequestPerConnection = conf.getMaxNumberOfRejectedRequestPerConnection();
@@ -163,12 +172,22 @@ public class ClientCnx extends PulsarHandler {
         // Fail out all the pending ops
         pendingRequests.forEach((key, future) -> future.completeExceptionally(e));
         pendingLookupRequests.forEach((key, future) -> future.completeExceptionally(e));
+        waitingLookupRequests.forEach(pair -> pair.getRight().getRight().completeExceptionally(e));
         pendingGetLastMessageIdRequests.forEach((key, future) -> future.completeExceptionally(e));
         pendingGetTopicsRequests.forEach((key, future) -> future.completeExceptionally(e));
 
         // Notify all attached producers/consumers so they have a chance to reconnect
         producers.forEach((id, producer) -> producer.connectionClosed(this));
         consumers.forEach((id, consumer) -> consumer.connectionClosed(this));
+
+        pendingRequests.clear();
+        pendingLookupRequests.clear();
+        waitingLookupRequests.clear();
+        pendingGetLastMessageIdRequests.clear();
+        pendingGetTopicsRequests.clear();
+
+        producers.clear();
+        consumers.clear();
     }
 
     // Command Handlers
@@ -399,24 +418,39 @@ public class ClientCnx extends PulsarHandler {
         }
     }
 
-    private boolean addPendingLookupRequests(long requestId, CompletableFuture<LookupDataResult> future) {
-        if (pendingLookupRequestSemaphore.tryAcquire()) {
-            pendingLookupRequests.put(requestId, future);
-            eventLoopGroup.schedule(() -> {
-                if (!future.isDone()) {
-                    future.completeExceptionally(new TimeoutException(
-                            requestId + " lookup request timedout after ms " + operationTimeoutMs));
-                }
-            }, operationTimeoutMs, TimeUnit.MILLISECONDS);
-            return true;
-        }
-        return false;
+    // caller of this method needs to be protected under pendingLookupRequestSemaphore
+    private void addPendingLookupRequests(long requestId, CompletableFuture<LookupDataResult> future) {
+        pendingLookupRequests.put(requestId, future);
+        eventLoopGroup.schedule(() -> {
+            if (!future.isDone()) {
+                future.completeExceptionally(new TimeoutException(
+                    requestId + " lookup request timedout after ms " + operationTimeoutMs));
+            }
+        }, operationTimeoutMs, TimeUnit.MILLISECONDS);
     }
 
     private CompletableFuture<LookupDataResult> getAndRemovePendingLookupRequest(long requestId) {
         CompletableFuture<LookupDataResult> result = pendingLookupRequests.remove(requestId);
         if (result != null) {
-            pendingLookupRequestSemaphore.release();
+            Pair<Long, Pair<ByteBuf, CompletableFuture<LookupDataResult>>> firstOneWaiting = waitingLookupRequests.poll();
+            if (firstOneWaiting != null) {
+                // schedule a new lookup in.
+                eventLoopGroup.submit(() -> {
+                    long newId = firstOneWaiting.getLeft();
+                    CompletableFuture<LookupDataResult> newFuture = firstOneWaiting.getRight().getRight();
+                    addPendingLookupRequests(newId, newFuture);
+                    ctx.writeAndFlush(firstOneWaiting.getRight().getLeft()).addListener(writeFuture -> {
+                        if (!writeFuture.isSuccess()) {
+                            log.warn("{} Failed to send request {} to broker: {}", ctx.channel(), newId,
+                                writeFuture.cause().getMessage());
+                            getAndRemovePendingLookupRequest(newId);
+                            newFuture.completeExceptionally(writeFuture.cause());
+                        }
+                    });
+                });
+            } else {
+                pendingLookupRequestSemaphore.release();
+            }
         }
         return result;
     }
@@ -495,11 +529,12 @@ public class ClientCnx extends PulsarHandler {
     public CompletableFuture<LookupDataResult> newLookup(ByteBuf request, long requestId) {
         CompletableFuture<LookupDataResult> future = new CompletableFuture<>();
 
-        if (addPendingLookupRequests(requestId, future)) {
+        if (pendingLookupRequestSemaphore.tryAcquire()) {
+            addPendingLookupRequests(requestId, future);
             ctx.writeAndFlush(request).addListener(writeFuture -> {
                 if (!writeFuture.isSuccess()) {
                     log.warn("{} Failed to send request {} to broker: {}", ctx.channel(), requestId,
-                            writeFuture.cause().getMessage());
+                        writeFuture.cause().getMessage());
                     getAndRemovePendingLookupRequest(requestId);
                     future.completeExceptionally(writeFuture.cause());
                 }
@@ -508,8 +543,15 @@ public class ClientCnx extends PulsarHandler {
             if (log.isDebugEnabled()) {
                 log.debug("{} Failed to add lookup-request into pending queue", requestId);
             }
-            future.completeExceptionally(new PulsarClientException.TooManyRequestsException(
-                    "Failed due to too many pending lookup requests"));
+            if (!waitingLookupRequests.offer(Pair.of(requestId, Pair.of(request, future)))) {
+                if (log.isDebugEnabled()) {
+                    log.debug("{} Failed to add lookup-request into waiting queue", requestId);
+                }
+                future.completeExceptionally(new PulsarClientException.TooManyRequestsException(String.format(
+                    "Requests number out of config: There are {%s} lookup requests outstanding and {%s} requests pending.",
+                    pendingLookupRequests.size(),
+                    waitingLookupRequests.size())));
+            }
         }
         return future;
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 859bb1b..1d3280b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -51,7 +51,8 @@ public class ClientConfigurationData implements Serializable, Cloneable {
     private String tlsTrustCertsFilePath = "";
     private boolean tlsAllowInsecureConnection = false;
     private boolean tlsHostnameVerificationEnable = false;
-    private int concurrentLookupRequest = 50000;
+    private int concurrentLookupRequest = 5000;
+    private int maxLookupRequest = 50000;
     private int maxNumberOfRejectedRequestPerConnection = 50;
     private int keepAliveIntervalSeconds = 30;
 

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.