You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/04/29 14:21:54 UTC

[pulsar] 02/03: [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress (#15366)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 016109032102662445bc2e780e1f28de8b441555
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Apr 29 14:59:32 2022 +0300

    [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress (#15366)
    
    * [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress
    
    * Add logging when execution is rejected
    
    (cherry picked from commit 4ae070c5b26be3bf92c007ca309bd4adbae5ce93)
---
 .../pulsar/proxy/server/DirectProxyHandler.java    |  19 ++--
 .../pulsar/proxy/server/ProxyConnection.java       | 103 ++++++++++++++++-----
 2 files changed, 92 insertions(+), 30 deletions(-)

diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index e04ac1e4ae8..24802f60a3d 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -80,9 +80,7 @@ public class DirectProxyHandler {
     private final ProxyService service;
     private final Runnable onHandshakeCompleteAction;
 
-    public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String brokerHostAndPort,
-                              InetSocketAddress targetBrokerAddress, int protocolVersion,
-                              Supplier<SslHandler> sslHandlerSupplier) {
+    public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection) {
         this.service = service;
         this.authentication = proxyConnection.getClientAuthentication();
         this.inboundChannel = proxyConnection.ctx().channel();
@@ -92,6 +90,10 @@ public class DirectProxyHandler {
         this.clientAuthData = proxyConnection.clientAuthData;
         this.clientAuthMethod = proxyConnection.clientAuthMethod;
         this.onHandshakeCompleteAction = proxyConnection::cancelKeepAliveTask;
+    }
+
+    public void connect(String brokerHostAndPort, InetSocketAddress targetBrokerAddress,
+                           int protocolVersion, Supplier<SslHandler> sslHandlerSupplier) {
         ProxyConfiguration config = service.getConfiguration();
 
         // Start the connection attempt.
@@ -208,6 +210,12 @@ public class DirectProxyHandler {
             (byte) 'Y',
     };
 
+    public void close() {
+        if (outboundChannel != null) {
+            outboundChannel.close();
+        }
+    }
+
     enum BackendState {
         Init, HandshakeCompleted
     }
@@ -344,10 +352,7 @@ public class DirectProxyHandler {
             onHandshakeCompleteAction.run();
             startDirectProxying(connected);
 
-            int maxMessageSize =
-                    connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
-            inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize))
-                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+            proxyConnection.brokerConnected(DirectProxyHandler.this, connected);
         }
 
         private void startDirectProxying(CommandConnected connected) {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 5d81c4b803a..39870f62af8 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -19,14 +19,16 @@
 package org.apache.pulsar.proxy.server;
 
 import static com.google.common.base.Preconditions.checkArgument;
-
+import static com.google.common.base.Preconditions.checkState;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.handler.codec.haproxy.HAProxyMessage;
 import io.netty.resolver.dns.DnsNameResolver;
+import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -50,6 +52,7 @@ import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.PulsarHandler;
 import org.apache.pulsar.common.api.proto.CommandAuthResponse;
 import org.apache.pulsar.common.api.proto.CommandConnect;
+import org.apache.pulsar.common.api.proto.CommandConnected;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.CommandLookupTopic;
 import org.apache.pulsar.common.api.proto.CommandGetSchema;
@@ -114,6 +117,9 @@ public class ProxyConnection extends PulsarHandler {
         // Follow redirects
         ProxyLookupRequests,
 
+        // Connecting to the broker
+        ProxyConnectingToBroker,
+
         // If we are proxying a connection to a specific broker, we
         // are just forwarding data between the 2 connections, without
         // looking into it
@@ -167,8 +173,8 @@ public class ProxyConnection extends PulsarHandler {
     public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception {
         super.channelInactive(ctx);
 
-        if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
-            directProxyHandler.outboundChannel.close();
+        if (directProxyHandler != null) {
+            directProxyHandler.close();
             directProxyHandler = null;
         }
 
@@ -189,11 +195,22 @@ public class ProxyConnection extends PulsarHandler {
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-        state = State.Closing;
         super.exceptionCaught(ctx, cause);
-        LOG.warn("[{}] Got exception {} : {} {}", remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(),
+        LOG.warn("[{}] Got exception {} : Message: {} State: {}", remoteAddress, cause.getClass().getSimpleName(),
+                cause.getMessage(), state,
                 ClientCnx.isKnownException(cause) ? null : cause);
-        ctx.close();
+        if (state != State.Closed) {
+            state = State.Closing;
+        }
+        if (ctx.channel().isOpen()) {
+            ctx.close();
+        } else {
+            // close connection to broker if that is present
+            if (directProxyHandler != null) {
+                directProxyHandler.close();
+                directProxyHandler = null;
+            }
+        }
     }
 
     @Override
@@ -222,18 +239,26 @@ public class ProxyConnection extends PulsarHandler {
             break;
 
         case ProxyConnectionToBroker:
-            // Pass the buffer to the outbound connection and schedule next read
-            // only if we can write on the connection
-            ProxyService.opsCounter.inc();
-            if (msg instanceof ByteBuf) {
-                int bytes = ((ByteBuf) msg).readableBytes();
-                directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
-                ProxyService.bytesCounter.inc(bytes);
+            if (directProxyHandler != null) {
+                ProxyService.opsCounter.inc();
+                if (msg instanceof ByteBuf) {
+                    int bytes = ((ByteBuf) msg).readableBytes();
+                    directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
+                    ProxyService.bytesCounter.inc(bytes);
+                }
+                directProxyHandler.outboundChannel.writeAndFlush(msg)
+                        .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+            } else {
+                LOG.warn("Received message of type {} while connection to broker is missing in state {}. "
+                                + "Dropping the input message (readable bytes={}).", msg.getClass(), state,
+                        msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1);
             }
-            directProxyHandler.outboundChannel.writeAndFlush(msg)
-                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
             break;
-
+        case ProxyConnectingToBroker:
+            LOG.warn("Received message of type {} while connecting to broker. "
+                            + "Dropping the input message (readable bytes={}).", msg.getClass(),
+                    msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1);
+            break;
         default:
             break;
         }
@@ -279,14 +304,9 @@ public class ProxyConnection extends PulsarHandler {
                 return;
             }
 
+            state = State.ProxyConnectingToBroker;
             brokerProxyValidator.resolveAndCheckTargetAddress(proxyToBrokerUrl)
-                    .thenAcceptAsync(address -> {
-                        // Client already knows which broker to connect. Let's open a
-                        // connection there and just pass bytes in both directions
-                        state = State.ProxyConnectionToBroker;
-                        directProxyHandler = new DirectProxyHandler(service, this, proxyToBrokerUrl, address,
-                                protocolVersionToAdvertise, sslHandlerSupplier);
-                    }, ctx.executor())
+                    .thenAcceptAsync(this::connectToBroker, ctx.executor())
                     .exceptionally(throwable -> {
                         if (throwable instanceof TargetAddressDeniedException
                                 || throwable.getCause() instanceof TargetAddressDeniedException) {
@@ -319,6 +339,43 @@ public class ProxyConnection extends PulsarHandler {
         }
     }
 
+    private void handleBrokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
+        checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop");
+        if (state == State.ProxyConnectingToBroker && ctx.channel().isOpen() && this.directProxyHandler == null) {
+            this.directProxyHandler = directProxyHandler;
+            state = State.ProxyConnectionToBroker;
+            int maxMessageSize =
+                    connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
+            ctx.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize))
+                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+        } else {
+            LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. "
+                            + "Closing connection to broker '{}'.",
+                    remoteAddress, ctx.channel().isOpen() ? "open" : "already closed",
+                    state != State.ProxyConnectingToBroker ? "invalid state " + state : "state " + state,
+                    proxyToBrokerUrl);
+            directProxyHandler.close();
+            ctx.close();
+        }
+    }
+
+    private void connectToBroker(InetSocketAddress brokerAddress) {
+        checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop");
+        DirectProxyHandler directProxyHandler = new DirectProxyHandler(service, this);
+        directProxyHandler.connect(proxyToBrokerUrl, brokerAddress,
+                protocolVersionToAdvertise, sslHandlerSupplier);
+    }
+
+    public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
+        try {
+            final CommandConnected finalConnected = new CommandConnected().copyFrom(connected);
+            ctx.executor().submit(() -> handleBrokerConnected(directProxyHandler, finalConnected));
+        } catch (RejectedExecutionException e) {
+            LOG.error("Event loop was already closed. Closing broker connection.", e);
+            directProxyHandler.close();
+        }
+    }
+
     // According to auth result, send newConnected or newAuthChallenge command.
     private void doAuthentication(AuthData clientData) throws Exception {
         AuthData brokerData = authState.authenticate(clientData);