You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/06/01 14:43:12 UTC

[pulsar] 05/10: [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress (#15366)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8115076c5111f7bcddb010639d787c38cf0b8f82
Author: Lari Hotari <lh...@apache.org>
AuthorDate: Thu Apr 28 19:43:13 2022 +0300

    [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress (#15366)
    
    - backports https://github.com/apache/pulsar/pull/15366 to branch-2.7
    
    (cherry picked from commit 4621ca63fcaabf3a0faefd487434dbd97c1d8859)
---
 .../pulsar/proxy/server/DirectProxyHandler.java    |  19 ++--
 .../pulsar/proxy/server/ProxyConnection.java       | 103 ++++++++++++++++-----
 2 files changed, 92 insertions(+), 30 deletions(-)

diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 8fa7787215d..64ce8c68b27 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -80,9 +80,7 @@ public class DirectProxyHandler {
     private final ProxyService service;
     private final Runnable onHandshakeCompleteAction;
 
-    public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String brokerHostAndPort,
-                              InetSocketAddress targetBrokerAddress, int protocolVersion,
-                              Supplier<SslHandler> sslHandlerSupplier) {
+    public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection) {
         this.service = service;
         this.authentication = proxyConnection.getClientAuthentication();
         this.inboundChannel = proxyConnection.ctx().channel();
@@ -92,6 +90,10 @@ public class DirectProxyHandler {
         this.clientAuthData = proxyConnection.clientAuthData;
         this.clientAuthMethod = proxyConnection.clientAuthMethod;
         this.onHandshakeCompleteAction = proxyConnection::cancelKeepAliveTask;
+    }
+
+    public void connect(String brokerHostAndPort, InetSocketAddress targetBrokerAddress,
+                           int protocolVersion, Supplier<SslHandler> sslHandlerSupplier) {
         ProxyConfiguration config = service.getConfiguration();
 
         // Start the connection attempt.
@@ -208,6 +210,12 @@ public class DirectProxyHandler {
             (byte) 'Y',
     };
 
+    public void close() {
+        if (outboundChannel != null) {
+            outboundChannel.close();
+        }
+    }
+
     enum BackendState {
         Init, HandshakeCompleted
     }
@@ -344,10 +352,7 @@ public class DirectProxyHandler {
             onHandshakeCompleteAction.run();
             startDirectProxying(connected);
 
-            int maxMessageSize =
-                    connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
-            inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize))
-                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+            proxyConnection.brokerConnected(DirectProxyHandler.this, connected);
         }
 
         private void startDirectProxying(CommandConnected connected) {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 395d16bbbd8..fbf604866b8 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -19,14 +19,16 @@
 package org.apache.pulsar.proxy.server;
 
 import static com.google.common.base.Preconditions.checkArgument;
-
+import static com.google.common.base.Preconditions.checkState;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.handler.codec.haproxy.HAProxyMessage;
 import io.netty.resolver.dns.DnsNameResolver;
+import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -51,6 +53,7 @@ import org.apache.pulsar.common.protocol.PulsarHandler;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema;
@@ -113,6 +116,9 @@ public class ProxyConnection extends PulsarHandler {
         // Follow redirects
         ProxyLookupRequests,
 
+        // Connecting to the broker
+        ProxyConnectingToBroker,
+
         // If we are proxying a connection to a specific broker, we
         // are just forwarding data between the 2 connections, without
         // looking into it
@@ -166,8 +172,8 @@ public class ProxyConnection extends PulsarHandler {
     public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception {
         super.channelInactive(ctx);
 
-        if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
-            directProxyHandler.outboundChannel.close();
+        if (directProxyHandler != null) {
+            directProxyHandler.close();
             directProxyHandler = null;
         }
 
@@ -188,11 +194,22 @@ public class ProxyConnection extends PulsarHandler {
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-        state = State.Closing;
         super.exceptionCaught(ctx, cause);
-        LOG.warn("[{}] Got exception {} : {} {}", remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(),
+        LOG.warn("[{}] Got exception {} : Message: {} State: {}", remoteAddress, cause.getClass().getSimpleName(),
+                cause.getMessage(), state,
                 ClientCnx.isKnownException(cause) ? null : cause);
-        ctx.close();
+        if (state != State.Closed) {
+            state = State.Closing;
+        }
+        if (ctx.channel().isOpen()) {
+            ctx.close();
+        } else {
+            // close connection to broker if that is present
+            if (directProxyHandler != null) {
+                directProxyHandler.close();
+                directProxyHandler = null;
+            }
+        }
     }
 
     @Override
@@ -221,18 +238,26 @@ public class ProxyConnection extends PulsarHandler {
             break;
 
         case ProxyConnectionToBroker:
-            // Pass the buffer to the outbound connection and schedule next read
-            // only if we can write on the connection
-            ProxyService.opsCounter.inc();
-            if (msg instanceof ByteBuf) {
-                int bytes = ((ByteBuf) msg).readableBytes();
-                directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
-                ProxyService.bytesCounter.inc(bytes);
+            if (directProxyHandler != null) {
+                ProxyService.opsCounter.inc();
+                if (msg instanceof ByteBuf) {
+                    int bytes = ((ByteBuf) msg).readableBytes();
+                    directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
+                    ProxyService.bytesCounter.inc(bytes);
+                }
+                directProxyHandler.outboundChannel.writeAndFlush(msg)
+                        .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+            } else {
+                LOG.warn("Received message of type {} while connection to broker is missing in state {}. "
+                                + "Dropping the input message (readable bytes={}).", msg.getClass(), state,
+                        msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1);
             }
-            directProxyHandler.outboundChannel.writeAndFlush(msg)
-                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
             break;
-
+        case ProxyConnectingToBroker:
+            LOG.warn("Received message of type {} while connecting to broker. "
+                            + "Dropping the input message (readable bytes={}).", msg.getClass(),
+                    msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1);
+            break;
         default:
             break;
         }
@@ -278,14 +303,9 @@ public class ProxyConnection extends PulsarHandler {
                 return;
             }
 
+            state = State.ProxyConnectingToBroker;
             brokerProxyValidator.resolveAndCheckTargetAddress(proxyToBrokerUrl)
-                    .thenAcceptAsync(address -> {
-                        // Client already knows which broker to connect. Let's open a
-                        // connection there and just pass bytes in both directions
-                        state = State.ProxyConnectionToBroker;
-                        directProxyHandler = new DirectProxyHandler(service, this, proxyToBrokerUrl, address,
-                                protocolVersionToAdvertise, sslHandlerSupplier);
-                    }, ctx.executor())
+                    .thenAcceptAsync(this::connectToBroker, ctx.executor())
                     .exceptionally(throwable -> {
                         if (throwable instanceof TargetAddressDeniedException
                                 || throwable.getCause() instanceof TargetAddressDeniedException) {
@@ -318,6 +338,43 @@ public class ProxyConnection extends PulsarHandler {
         }
     }
 
+    private void handleBrokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
+        checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop");
+        if (state == State.ProxyConnectingToBroker && ctx.channel().isOpen() && this.directProxyHandler == null) {
+            this.directProxyHandler = directProxyHandler;
+            state = State.ProxyConnectionToBroker;
+            int maxMessageSize =
+                    connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
+            ctx.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize))
+                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+        } else {
+            LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. "
+                            + "Closing connection to broker '{}'.",
+                    remoteAddress, ctx.channel().isOpen() ? "open" : "already closed",
+                    state != State.ProxyConnectingToBroker ? "invalid state " + state : "state " + state,
+                    proxyToBrokerUrl);
+            directProxyHandler.close();
+            ctx.close();
+        }
+    }
+
+    private void connectToBroker(InetSocketAddress brokerAddress) {
+        checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop");
+        DirectProxyHandler directProxyHandler = new DirectProxyHandler(service, this);
+        directProxyHandler.connect(proxyToBrokerUrl, brokerAddress,
+                protocolVersionToAdvertise, sslHandlerSupplier);
+    }
+
+    public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
+        try {
+            final CommandConnected finalConnected = connected.toBuilder().build();
+            ctx.executor().submit(() -> handleBrokerConnected(directProxyHandler, finalConnected));
+        } catch (RejectedExecutionException e) {
+            LOG.error("Event loop was already closed. Closing broker connection.", e);
+            directProxyHandler.close();
+        }
+    }
+
     // According to auth result, send newConnected or newAuthChallenge command.
     private void doAuthentication(AuthData clientData) throws Exception {
         AuthData brokerData = authState.authenticate(clientData);