You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/06/01 14:43:15 UTC
[pulsar] 08/10: [Proxy] Prevent leak of unreleased lookupRequestSemaphore permits (#13812)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit dbe0518554bfb4306adf23335d62eb8959b83b27
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Jan 19 06:41:12 2022 +0200
[Proxy] Prevent leak of unreleased lookupRequestSemaphore permits (#13812)
* [Proxy] Prevent leak of unreleased lookupRequestSemaphore permits
- should release permit in try-finally block
* Cleanup code in LookupProxyHandler
(cherry picked from commit 85b62e050b01b591a4b5751aab48b418ac9e4e76)
(cherry picked from commit dcc07e8ebacec86a3779b289e235dd7731aa208e)
---
.../pulsar/proxy/server/LookupProxyHandler.java | 96 ++++++++++------------
1 file changed, 45 insertions(+), 51 deletions(-)
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index a8525e4da98..8c18d456cbb 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -18,14 +18,13 @@
*/
package org.apache.pulsar.proxy.server;
-import static org.apache.commons.lang3.StringUtils.isBlank;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Optional;
-
+import java.util.concurrent.Semaphore;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.protocol.Commands;
@@ -47,8 +46,8 @@ import io.prometheus.client.Counter;
public class LookupProxyHandler {
private final String throttlingErrorMessage = "Too many concurrent lookup and partitionsMetadata requests";
- private final ProxyService service;
private final ProxyConnection proxyConnection;
+ private final BrokerDiscoveryProvider discoveryProvider;
private final boolean connectWithTLS;
private SocketAddress clientAddress;
@@ -83,9 +82,11 @@ public class LookupProxyHandler {
.build("pulsar_proxy_rejected_get_topics_of_namespace_requests",
"Counter of getTopicsOfNamespace requests rejected due to throttling")
.create().register();
+ private final Semaphore lookupRequestSemaphore;
public LookupProxyHandler(ProxyService proxy, ProxyConnection proxyConnection) {
- this.service = proxy;
+ this.discoveryProvider = proxy.getDiscoveryProvider();
+ this.lookupRequestSemaphore = proxy.getLookupRequestSemaphore();
this.proxyConnection = proxyConnection;
this.clientAddress = proxyConnection.clientAddress();
this.connectWithTLS = proxy.getConfiguration().isTlsEnabledWithBroker();
@@ -98,28 +99,16 @@ public class LookupProxyHandler {
log.debug("Received Lookup from {}", clientAddress);
}
long clientRequestId = lookup.getRequestId();
- if (this.service.getLookupRequestSemaphore().tryAcquire()) {
- lookupRequests.inc();
- String topic = lookup.getTopic();
- String serviceUrl;
- if (isBlank(brokerServiceURL)) {
- ServiceLookupData availableBroker = null;
- try {
- availableBroker = service.getDiscoveryProvider().nextBroker();
- } catch (Exception e) {
- log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e);
- proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
- e.getMessage(), clientRequestId));
- return;
+ if (lookupRequestSemaphore.tryAcquire()) {
+ try {
+ lookupRequests.inc();
+ String serviceUrl = getBrokerServiceUrl(clientRequestId);
+ if (serviceUrl != null) {
+ performLookup(clientRequestId, lookup.getTopic(), serviceUrl, false, 10);
}
- serviceUrl = this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls()
- : availableBroker.getPulsarServiceUrl();
- } else {
- serviceUrl = this.connectWithTLS ? service.getConfiguration().getBrokerServiceURLTLS()
- : service.getConfiguration().getBrokerServiceURL();
+ } finally {
+ lookupRequestSemaphore.release();
}
- performLookup(clientRequestId, topic, serviceUrl, false, 10);
- this.service.getLookupRequestSemaphore().release();
} else {
rejectedLookupRequests.inc();
if (log.isDebugEnabled()) {
@@ -203,9 +192,12 @@ public class LookupProxyHandler {
log.debug("[{}] Received PartitionMetadataLookup", clientAddress);
}
final long clientRequestId = partitionMetadata.getRequestId();
- if (this.service.getLookupRequestSemaphore().tryAcquire()) {
- handlePartitionMetadataResponse(partitionMetadata, clientRequestId);
- this.service.getLookupRequestSemaphore().release();
+ if (lookupRequestSemaphore.tryAcquire()) {
+ try {
+ handlePartitionMetadataResponse(partitionMetadata, clientRequestId);
+ } finally {
+ lookupRequestSemaphore.release();
+ }
} else {
rejectedPartitionsMetadataRequests.inc();
if (log.isDebugEnabled()) {
@@ -226,7 +218,7 @@ public class LookupProxyHandler {
long clientRequestId) {
TopicName topicName = TopicName.get(partitionMetadata.getTopic());
- String serviceUrl = getServiceUrl(clientRequestId);
+ String serviceUrl = getBrokerServiceUrl(clientRequestId);
if (serviceUrl == null) {
log.warn("No available broker for {} to lookup partition metadata", topicName);
return;
@@ -273,9 +265,12 @@ public class LookupProxyHandler {
final long requestId = commandGetTopicsOfNamespace.getRequestId();
- if (this.service.getLookupRequestSemaphore().tryAcquire()) {
- handleGetTopicsOfNamespace(commandGetTopicsOfNamespace, requestId);
- this.service.getLookupRequestSemaphore().release();
+ if (lookupRequestSemaphore.tryAcquire()) {
+ try {
+ handleGetTopicsOfNamespace(commandGetTopicsOfNamespace, requestId);
+ } finally {
+ lookupRequestSemaphore.release();
+ }
} else {
rejectedGetTopicsOfNamespaceRequests.inc();
if (log.isDebugEnabled()) {
@@ -290,7 +285,7 @@ public class LookupProxyHandler {
private void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace,
long clientRequestId) {
- String serviceUrl = getServiceUrl(clientRequestId);
+ String serviceUrl = getBrokerServiceUrl(clientRequestId);
if(!StringUtils.isNotBlank(serviceUrl)) {
return;
@@ -352,7 +347,7 @@ public class LookupProxyHandler {
}
final long clientRequestId = commandGetSchema.getRequestId();
- String serviceUrl = getServiceUrl(clientRequestId);
+ String serviceUrl = getBrokerServiceUrl(clientRequestId);
String topic = commandGetSchema.getTopic();
Optional<SchemaVersion> schemaVersion;
if (commandGetSchema.hasSchemaVersion()) {
@@ -402,25 +397,24 @@ public class LookupProxyHandler {
}
- private String getServiceUrl(long clientRequestId) {
- if (isBlank(brokerServiceURL)) {
- ServiceLookupData availableBroker;
- try {
- availableBroker = service.getDiscoveryProvider().nextBroker();
- } catch (Exception e) {
- log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e);
- proxyConnection.ctx().writeAndFlush(Commands.newError(
- clientRequestId, ServerError.ServiceNotReady, e.getMessage()
- ));
- return null;
- }
- return this.connectWithTLS ?
- availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl();
- } else {
- return this.connectWithTLS ?
- service.getConfiguration().getBrokerServiceURLTLS() : service.getConfiguration().getBrokerServiceURL();
+ /**
+ * Get default broker service url or discovery an available broker.
+ **/
+ private String getBrokerServiceUrl(long clientRequestId) {
+ if (StringUtils.isNotBlank(brokerServiceURL)) {
+ return brokerServiceURL;
}
-
+ ServiceLookupData availableBroker;
+ try {
+ availableBroker = discoveryProvider.nextBroker();
+ } catch (Exception e) {
+ log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e);
+ proxyConnection.ctx().writeAndFlush(Commands.newError(
+ clientRequestId, ServerError.ServiceNotReady, e.getMessage()
+ ));
+ return null;
+ }
+ return this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl();
}
private InetSocketAddress getAddr(String brokerServiceUrl, long clientRequestId) {