You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/03/17 16:11:58 UTC

[GitHub] [pulsar] eolivelli commented on a change in pull request #14713: [Proxy] Refactor Proxy code and fix connection stalling by switching to auto read mode

eolivelli commented on a change in pull request #14713:
URL: https://github.com/apache/pulsar/pull/14713#discussion_r829232363



##########
File path: pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
##########
@@ -123,67 +128,52 @@ protected void initChannel(SocketChannel ch) {
                 }
                 ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
                     Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
-                ch.pipeline().addLast("proxyOutboundHandler", new ProxyBackendHandler(config, protocolVersion));
+                ch.pipeline().addLast("proxyOutboundHandler",
+                        new ProxyBackendHandler(config, protocolVersion, remoteHost));
             }
         });
 
-        URI targetBroker;
-        try {
-            // targetBrokerUrl is coming in the "hostname:6650" form, so we need
-            // to extract host and port
-            targetBroker = new URI("pulsar://" + targetBrokerUrl);
-        } catch (URISyntaxException e) {
-            log.warn("[{}] Failed to parse broker url '{}'", inboundChannel, targetBrokerUrl, e);
-            inboundChannel.close();
-            return;
-        }
-
         ChannelFuture f = b.connect(targetBrokerAddress);
         outboundChannel = f.channel();
         f.addListener(future -> {
             if (!future.isSuccess()) {
                 // Close the connection if the connection attempt has failed.
                 log.warn("[{}] Establishing connection to {} ({}) failed. Closing inbound channel.", inboundChannel,
-                        targetBrokerAddress, targetBrokerUrl, future.cause());
+                        targetBrokerAddress, brokerHostAndPort, future.cause());
                 inboundChannel.close();
                 return;
             }
-            final ProxyBackendHandler cnx = (ProxyBackendHandler) outboundChannel.pipeline()
-                    .get("proxyOutboundHandler");
-            cnx.setRemoteHostName(targetBroker.getHost());
-
-            // if enable full parsing feature
-            if (service.getProxyLogLevel() == 2) {
-                //Set a map between inbound and outbound,
-                //so can find inbound by outbound or find outbound by inbound
-                inboundOutboundChannelMap.put(outboundChannel.id(), inboundChannel.id());
-            }
+        });
+    }
 
-            if (!config.isHaProxyProtocolEnabled()) {
-                return;
-            }
+    private String parseHost(String brokerPortAndHost) {

Review comment:
       nit: static




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org