You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/06/01 14:43:14 UTC

[pulsar] 07/10: [Proxy] Remove unnecessary blocking DNS lookup in LookupProxyHandler (#15415)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 462d44f906fa4dd873c3e4767fcdce84bea20f48
Author: Lari Hotari <lh...@apache.org>
AuthorDate: Wed May 4 08:45:00 2022 +0300

    [Proxy] Remove unnecessary blocking DNS lookup in LookupProxyHandler (#15415)
    
    * [Proxy] Remove unnecessary blocking DNS lookup in LookupProxyHandler
    
    * Use existing code pattern for creating address
    
    (cherry picked from commit 7373a51690d728475d47846bfbcca4fa64f2e228)
    (cherry picked from commit 5980cdc109736ed7e0df7df0e031dcde73e99536)
---
 .../pulsar/proxy/server/LookupProxyHandler.java    | 45 ++++++++++------------
 1 file changed, 21 insertions(+), 24 deletions(-)

diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index 82300a3ae7f..a8525e4da98 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetad
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -224,20 +225,16 @@ public class LookupProxyHandler {
     private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata,
             long clientRequestId) {
         TopicName topicName = TopicName.get(partitionMetadata.getTopic());
-        URI brokerURI;
-        try {
-            String availableBrokerServiceURL = getBrokerServiceUrl(clientRequestId);
-            if (availableBrokerServiceURL == null) {
-                log.warn("No available broker for {} to lookup partition metadata", topicName);
-                return;
-            }
-            brokerURI = new URI(availableBrokerServiceURL);
-        } catch (URISyntaxException e) {
-            proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.MetadataError,
-                    e.getMessage(), clientRequestId));
+
+        String serviceUrl = getServiceUrl(clientRequestId);
+        if (serviceUrl == null) {
+            log.warn("No available broker for {} to lookup partition metadata", topicName);
+            return;
+        }
+        InetSocketAddress addr = getAddr(serviceUrl, clientRequestId);
+        if (addr == null) {
             return;
         }
-        InetSocketAddress addr = new InetSocketAddress(brokerURI.getHost(), brokerURI.getPort());
 
         if (log.isDebugEnabled()) {
             log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", addr,
@@ -293,7 +290,7 @@ public class LookupProxyHandler {
 
     private void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace,
                                             long clientRequestId) {
-        String serviceUrl = getBrokerServiceUrl(clientRequestId);
+        String serviceUrl = getServiceUrl(clientRequestId);
 
         if(!StringUtils.isNotBlank(serviceUrl)) {
             return;
@@ -355,8 +352,14 @@ public class LookupProxyHandler {
         }
 
         final long clientRequestId = commandGetSchema.getRequestId();
-        String serviceUrl = getBrokerServiceUrl(clientRequestId);
+        String serviceUrl = getServiceUrl(clientRequestId);
         String topic = commandGetSchema.getTopic();
+        Optional<SchemaVersion> schemaVersion;
+        if (commandGetSchema.hasSchemaVersion()) {
+            schemaVersion = Optional.of(commandGetSchema.getSchemaVersion().toByteArray()).map(BytesSchemaVersion::of);
+        } else {
+            schemaVersion = Optional.empty();
+        }
 
         if(!StringUtils.isNotBlank(serviceUrl)) {
             return;
@@ -375,12 +378,9 @@ public class LookupProxyHandler {
             // Connected to backend broker
             long requestId = proxyConnection.newRequestId();
             ByteBuf command;
-            byte[] schemaVersion = null;
-            if (commandGetSchema.hasSchemaVersion()) {
-                schemaVersion = commandGetSchema.getSchemaVersion().toByteArray();
-            }
-            command = Commands.newGetSchema(requestId, topic,
-                    Optional.ofNullable(schemaVersion).map(BytesSchemaVersion::of));
+
+            command = Commands.newGetSchema(requestId, topic, schemaVersion);
+
             clientCnx.sendGetRawSchema(command, requestId).whenComplete((r, t) -> {
                 if (t != null) {
                     log.warn("[{}] Failed to get schema {}: {}", clientAddress, topic, t);
@@ -402,10 +402,7 @@ public class LookupProxyHandler {
 
     }
 
-    /**
-     *  Get default broker service url or discovery an available broker
-     **/
-    private String getBrokerServiceUrl(long clientRequestId) {
+    private String getServiceUrl(long clientRequestId) {
         if (isBlank(brokerServiceURL)) {
             ServiceLookupData availableBroker;
             try {