You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/06/01 14:43:14 UTC
[pulsar] 07/10: [Proxy] Remove unnecessary blocking DNS lookup in LookupProxyHandler (#15415)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 462d44f906fa4dd873c3e4767fcdce84bea20f48
Author: Lari Hotari <lh...@apache.org>
AuthorDate: Wed May 4 08:45:00 2022 +0300
[Proxy] Remove unnecessary blocking DNS lookup in LookupProxyHandler (#15415)
* [Proxy] Remove unnecessary blocking DNS lookup in LookupProxyHandler
* Use existing code pattern for creating address
(cherry picked from commit 7373a51690d728475d47846bfbcca4fa64f2e228)
(cherry picked from commit 5980cdc109736ed7e0df7df0e031dcde73e99536)
---
.../pulsar/proxy/server/LookupProxyHandler.java | 45 ++++++++++------------
1 file changed, 21 insertions(+), 24 deletions(-)
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index 82300a3ae7f..a8525e4da98 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetad
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -224,20 +225,16 @@ public class LookupProxyHandler {
private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata,
long clientRequestId) {
TopicName topicName = TopicName.get(partitionMetadata.getTopic());
- URI brokerURI;
- try {
- String availableBrokerServiceURL = getBrokerServiceUrl(clientRequestId);
- if (availableBrokerServiceURL == null) {
- log.warn("No available broker for {} to lookup partition metadata", topicName);
- return;
- }
- brokerURI = new URI(availableBrokerServiceURL);
- } catch (URISyntaxException e) {
- proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.MetadataError,
- e.getMessage(), clientRequestId));
+
+ String serviceUrl = getServiceUrl(clientRequestId);
+ if (serviceUrl == null) {
+ log.warn("No available broker for {} to lookup partition metadata", topicName);
+ return;
+ }
+ InetSocketAddress addr = getAddr(serviceUrl, clientRequestId);
+ if (addr == null) {
return;
}
- InetSocketAddress addr = new InetSocketAddress(brokerURI.getHost(), brokerURI.getPort());
if (log.isDebugEnabled()) {
log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", addr,
@@ -293,7 +290,7 @@ public class LookupProxyHandler {
private void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace,
long clientRequestId) {
- String serviceUrl = getBrokerServiceUrl(clientRequestId);
+ String serviceUrl = getServiceUrl(clientRequestId);
if(!StringUtils.isNotBlank(serviceUrl)) {
return;
@@ -355,8 +352,14 @@ public class LookupProxyHandler {
}
final long clientRequestId = commandGetSchema.getRequestId();
- String serviceUrl = getBrokerServiceUrl(clientRequestId);
+ String serviceUrl = getServiceUrl(clientRequestId);
String topic = commandGetSchema.getTopic();
+ Optional<SchemaVersion> schemaVersion;
+ if (commandGetSchema.hasSchemaVersion()) {
+ schemaVersion = Optional.of(commandGetSchema.getSchemaVersion().toByteArray()).map(BytesSchemaVersion::of);
+ } else {
+ schemaVersion = Optional.empty();
+ }
if(!StringUtils.isNotBlank(serviceUrl)) {
return;
@@ -375,12 +378,9 @@ public class LookupProxyHandler {
// Connected to backend broker
long requestId = proxyConnection.newRequestId();
ByteBuf command;
- byte[] schemaVersion = null;
- if (commandGetSchema.hasSchemaVersion()) {
- schemaVersion = commandGetSchema.getSchemaVersion().toByteArray();
- }
- command = Commands.newGetSchema(requestId, topic,
- Optional.ofNullable(schemaVersion).map(BytesSchemaVersion::of));
+
+ command = Commands.newGetSchema(requestId, topic, schemaVersion);
+
clientCnx.sendGetRawSchema(command, requestId).whenComplete((r, t) -> {
if (t != null) {
log.warn("[{}] Failed to get schema {}: {}", clientAddress, topic, t);
@@ -402,10 +402,7 @@ public class LookupProxyHandler {
}
- /**
- * Get default broker service url or discovery an available broker
- **/
- private String getBrokerServiceUrl(long clientRequestId) {
+ private String getServiceUrl(long clientRequestId) {
if (isBlank(brokerServiceURL)) {
ServiceLookupData availableBroker;
try {