You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/06/01 14:43:15 UTC

[pulsar] 08/10: [Proxy] Prevent leak of unreleased lookupRequestSemaphore permits (#13812)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit dbe0518554bfb4306adf23335d62eb8959b83b27
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Jan 19 06:41:12 2022 +0200

    [Proxy] Prevent leak of unreleased lookupRequestSemaphore permits (#13812)
    
    * [Proxy] Prevent leak of unreleased lookupRequestSemaphore permits
    
    - should release permit in try-finally block
    
    * Cleanup code in LookupProxyHandler
    
    (cherry picked from commit 85b62e050b01b591a4b5751aab48b418ac9e4e76)
    (cherry picked from commit dcc07e8ebacec86a3779b289e235dd7731aa208e)
---
 .../pulsar/proxy/server/LookupProxyHandler.java    | 96 ++++++++++------------
 1 file changed, 45 insertions(+), 51 deletions(-)

diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index a8525e4da98..8c18d456cbb 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -18,14 +18,13 @@
  */
 package org.apache.pulsar.proxy.server;
 
-import static org.apache.commons.lang3.StringUtils.isBlank;
 
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Optional;
-
+import java.util.concurrent.Semaphore;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.protocol.Commands;
@@ -47,8 +46,8 @@ import io.prometheus.client.Counter;
 
 public class LookupProxyHandler {
     private final String throttlingErrorMessage = "Too many concurrent lookup and partitionsMetadata requests";
-    private final ProxyService service;
     private final ProxyConnection proxyConnection;
+    private final BrokerDiscoveryProvider discoveryProvider;
     private final boolean connectWithTLS;
 
     private SocketAddress clientAddress;
@@ -83,9 +82,11 @@ public class LookupProxyHandler {
             .build("pulsar_proxy_rejected_get_topics_of_namespace_requests",
                     "Counter of getTopicsOfNamespace requests rejected due to throttling")
             .create().register();
+    private final Semaphore lookupRequestSemaphore;
 
     public LookupProxyHandler(ProxyService proxy, ProxyConnection proxyConnection) {
-        this.service = proxy;
+        this.discoveryProvider = proxy.getDiscoveryProvider();
+        this.lookupRequestSemaphore = proxy.getLookupRequestSemaphore();
         this.proxyConnection = proxyConnection;
         this.clientAddress = proxyConnection.clientAddress();
         this.connectWithTLS = proxy.getConfiguration().isTlsEnabledWithBroker();
@@ -98,28 +99,16 @@ public class LookupProxyHandler {
             log.debug("Received Lookup from {}", clientAddress);
         }
         long clientRequestId = lookup.getRequestId();
-        if (this.service.getLookupRequestSemaphore().tryAcquire()) {
-            lookupRequests.inc();
-            String topic = lookup.getTopic();
-            String serviceUrl;
-            if (isBlank(brokerServiceURL)) {
-                ServiceLookupData availableBroker = null;
-                try {
-                    availableBroker = service.getDiscoveryProvider().nextBroker();
-                } catch (Exception e) {
-                    log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e);
-                    proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
-                            e.getMessage(), clientRequestId));
-                    return;
+        if (lookupRequestSemaphore.tryAcquire()) {
+            try {
+                lookupRequests.inc();
+                String serviceUrl = getBrokerServiceUrl(clientRequestId);
+                if (serviceUrl != null) {
+                    performLookup(clientRequestId, lookup.getTopic(), serviceUrl, false, 10);
                 }
-                serviceUrl = this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls()
-                        : availableBroker.getPulsarServiceUrl();
-            } else {
-                serviceUrl = this.connectWithTLS ? service.getConfiguration().getBrokerServiceURLTLS()
-                        : service.getConfiguration().getBrokerServiceURL();
+            } finally {
+                lookupRequestSemaphore.release();
             }
-            performLookup(clientRequestId, topic, serviceUrl, false, 10);
-            this.service.getLookupRequestSemaphore().release();
         } else {
             rejectedLookupRequests.inc();
             if (log.isDebugEnabled()) {
@@ -203,9 +192,12 @@ public class LookupProxyHandler {
             log.debug("[{}] Received PartitionMetadataLookup", clientAddress);
         }
         final long clientRequestId = partitionMetadata.getRequestId();
-        if (this.service.getLookupRequestSemaphore().tryAcquire()) {
-            handlePartitionMetadataResponse(partitionMetadata, clientRequestId);
-            this.service.getLookupRequestSemaphore().release();
+        if (lookupRequestSemaphore.tryAcquire()) {
+            try {
+                handlePartitionMetadataResponse(partitionMetadata, clientRequestId);
+            } finally {
+                lookupRequestSemaphore.release();
+            }
         } else {
             rejectedPartitionsMetadataRequests.inc();
             if (log.isDebugEnabled()) {
@@ -226,7 +218,7 @@ public class LookupProxyHandler {
             long clientRequestId) {
         TopicName topicName = TopicName.get(partitionMetadata.getTopic());
 
-        String serviceUrl = getServiceUrl(clientRequestId);
+        String serviceUrl = getBrokerServiceUrl(clientRequestId);
         if (serviceUrl == null) {
             log.warn("No available broker for {} to lookup partition metadata", topicName);
             return;
@@ -273,9 +265,12 @@ public class LookupProxyHandler {
 
         final long requestId = commandGetTopicsOfNamespace.getRequestId();
 
-        if (this.service.getLookupRequestSemaphore().tryAcquire()) {
-            handleGetTopicsOfNamespace(commandGetTopicsOfNamespace, requestId);
-            this.service.getLookupRequestSemaphore().release();
+        if (lookupRequestSemaphore.tryAcquire()) {
+            try {
+                handleGetTopicsOfNamespace(commandGetTopicsOfNamespace, requestId);
+            } finally {
+                lookupRequestSemaphore.release();
+            }
         } else {
             rejectedGetTopicsOfNamespaceRequests.inc();
             if (log.isDebugEnabled()) {
@@ -290,7 +285,7 @@ public class LookupProxyHandler {
 
     private void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace,
                                             long clientRequestId) {
-        String serviceUrl = getServiceUrl(clientRequestId);
+        String serviceUrl = getBrokerServiceUrl(clientRequestId);
 
         if(!StringUtils.isNotBlank(serviceUrl)) {
             return;
@@ -352,7 +347,7 @@ public class LookupProxyHandler {
         }
 
         final long clientRequestId = commandGetSchema.getRequestId();
-        String serviceUrl = getServiceUrl(clientRequestId);
+        String serviceUrl = getBrokerServiceUrl(clientRequestId);
         String topic = commandGetSchema.getTopic();
         Optional<SchemaVersion> schemaVersion;
         if (commandGetSchema.hasSchemaVersion()) {
@@ -402,25 +397,24 @@ public class LookupProxyHandler {
 
     }
 
-    private String getServiceUrl(long clientRequestId) {
-        if (isBlank(brokerServiceURL)) {
-            ServiceLookupData availableBroker;
-            try {
-                availableBroker = service.getDiscoveryProvider().nextBroker();
-            } catch (Exception e) {
-                log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e);
-                proxyConnection.ctx().writeAndFlush(Commands.newError(
-                        clientRequestId, ServerError.ServiceNotReady, e.getMessage()
-                ));
-                return null;
-            }
-            return this.connectWithTLS ?
-                    availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl();
-        } else {
-            return this.connectWithTLS ?
-                    service.getConfiguration().getBrokerServiceURLTLS() : service.getConfiguration().getBrokerServiceURL();
+    /**
+     *  Get default broker service url or discovery an available broker.
+     **/
+    private String getBrokerServiceUrl(long clientRequestId) {
+        if (StringUtils.isNotBlank(brokerServiceURL)) {
+            return brokerServiceURL;
         }
-
+        ServiceLookupData availableBroker;
+        try {
+            availableBroker = discoveryProvider.nextBroker();
+        } catch (Exception e) {
+            log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e);
+            proxyConnection.ctx().writeAndFlush(Commands.newError(
+                    clientRequestId, ServerError.ServiceNotReady, e.getMessage()
+            ));
+            return null;
+        }
+        return this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl();
     }
 
     private InetSocketAddress getAddr(String brokerServiceUrl, long clientRequestId) {