You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/06/01 14:43:12 UTC
[pulsar] 05/10: [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress (#15366)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8115076c5111f7bcddb010639d787c38cf0b8f82
Author: Lari Hotari <lh...@apache.org>
AuthorDate: Thu Apr 28 19:43:13 2022 +0300
[Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress (#15366)
- backports https://github.com/apache/pulsar/pull/15366 to branch-2.7
(cherry picked from commit 4621ca63fcaabf3a0faefd487434dbd97c1d8859)
---
.../pulsar/proxy/server/DirectProxyHandler.java | 19 ++--
.../pulsar/proxy/server/ProxyConnection.java | 103 ++++++++++++++++-----
2 files changed, 92 insertions(+), 30 deletions(-)
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 8fa7787215d..64ce8c68b27 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -80,9 +80,7 @@ public class DirectProxyHandler {
private final ProxyService service;
private final Runnable onHandshakeCompleteAction;
- public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String brokerHostAndPort,
- InetSocketAddress targetBrokerAddress, int protocolVersion,
- Supplier<SslHandler> sslHandlerSupplier) {
+ public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection) {
this.service = service;
this.authentication = proxyConnection.getClientAuthentication();
this.inboundChannel = proxyConnection.ctx().channel();
@@ -92,6 +90,10 @@ public class DirectProxyHandler {
this.clientAuthData = proxyConnection.clientAuthData;
this.clientAuthMethod = proxyConnection.clientAuthMethod;
this.onHandshakeCompleteAction = proxyConnection::cancelKeepAliveTask;
+ }
+
+ public void connect(String brokerHostAndPort, InetSocketAddress targetBrokerAddress,
+ int protocolVersion, Supplier<SslHandler> sslHandlerSupplier) {
ProxyConfiguration config = service.getConfiguration();
// Start the connection attempt.
@@ -208,6 +210,12 @@ public class DirectProxyHandler {
(byte) 'Y',
};
+ public void close() {
+ if (outboundChannel != null) {
+ outboundChannel.close();
+ }
+ }
+
enum BackendState {
Init, HandshakeCompleted
}
@@ -344,10 +352,7 @@ public class DirectProxyHandler {
onHandshakeCompleteAction.run();
startDirectProxying(connected);
- int maxMessageSize =
- connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
- inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize))
- .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+ proxyConnection.brokerConnected(DirectProxyHandler.this, connected);
}
private void startDirectProxying(CommandConnected connected) {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 395d16bbbd8..fbf604866b8 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -19,14 +19,16 @@
package org.apache.pulsar.proxy.server;
import static com.google.common.base.Preconditions.checkArgument;
-
+import static com.google.common.base.Preconditions.checkState;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.resolver.dns.DnsNameResolver;
+import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -51,6 +53,7 @@ import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema;
@@ -113,6 +116,9 @@ public class ProxyConnection extends PulsarHandler {
// Follow redirects
ProxyLookupRequests,
+ // Connecting to the broker
+ ProxyConnectingToBroker,
+
// If we are proxying a connection to a specific broker, we
// are just forwarding data between the 2 connections, without
// looking into it
@@ -166,8 +172,8 @@ public class ProxyConnection extends PulsarHandler {
public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
- if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
- directProxyHandler.outboundChannel.close();
+ if (directProxyHandler != null) {
+ directProxyHandler.close();
directProxyHandler = null;
}
@@ -188,11 +194,22 @@ public class ProxyConnection extends PulsarHandler {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- state = State.Closing;
super.exceptionCaught(ctx, cause);
- LOG.warn("[{}] Got exception {} : {} {}", remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(),
+ LOG.warn("[{}] Got exception {} : Message: {} State: {}", remoteAddress, cause.getClass().getSimpleName(),
+ cause.getMessage(), state,
ClientCnx.isKnownException(cause) ? null : cause);
- ctx.close();
+ if (state != State.Closed) {
+ state = State.Closing;
+ }
+ if (ctx.channel().isOpen()) {
+ ctx.close();
+ } else {
+ // close connection to broker if that is present
+ if (directProxyHandler != null) {
+ directProxyHandler.close();
+ directProxyHandler = null;
+ }
+ }
}
@Override
@@ -221,18 +238,26 @@ public class ProxyConnection extends PulsarHandler {
break;
case ProxyConnectionToBroker:
- // Pass the buffer to the outbound connection and schedule next read
- // only if we can write on the connection
- ProxyService.opsCounter.inc();
- if (msg instanceof ByteBuf) {
- int bytes = ((ByteBuf) msg).readableBytes();
- directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
- ProxyService.bytesCounter.inc(bytes);
+ if (directProxyHandler != null) {
+ ProxyService.opsCounter.inc();
+ if (msg instanceof ByteBuf) {
+ int bytes = ((ByteBuf) msg).readableBytes();
+ directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
+ ProxyService.bytesCounter.inc(bytes);
+ }
+ directProxyHandler.outboundChannel.writeAndFlush(msg)
+ .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+ } else {
+ LOG.warn("Received message of type {} while connection to broker is missing in state {}. "
+ + "Dropping the input message (readable bytes={}).", msg.getClass(), state,
+ msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1);
}
- directProxyHandler.outboundChannel.writeAndFlush(msg)
- .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
break;
-
+ case ProxyConnectingToBroker:
+ LOG.warn("Received message of type {} while connecting to broker. "
+ + "Dropping the input message (readable bytes={}).", msg.getClass(),
+ msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1);
+ break;
default:
break;
}
@@ -278,14 +303,9 @@ public class ProxyConnection extends PulsarHandler {
return;
}
+ state = State.ProxyConnectingToBroker;
brokerProxyValidator.resolveAndCheckTargetAddress(proxyToBrokerUrl)
- .thenAcceptAsync(address -> {
- // Client already knows which broker to connect. Let's open a
- // connection there and just pass bytes in both directions
- state = State.ProxyConnectionToBroker;
- directProxyHandler = new DirectProxyHandler(service, this, proxyToBrokerUrl, address,
- protocolVersionToAdvertise, sslHandlerSupplier);
- }, ctx.executor())
+ .thenAcceptAsync(this::connectToBroker, ctx.executor())
.exceptionally(throwable -> {
if (throwable instanceof TargetAddressDeniedException
|| throwable.getCause() instanceof TargetAddressDeniedException) {
@@ -318,6 +338,43 @@ public class ProxyConnection extends PulsarHandler {
}
}
+ private void handleBrokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
+ checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop");
+ if (state == State.ProxyConnectingToBroker && ctx.channel().isOpen() && this.directProxyHandler == null) {
+ this.directProxyHandler = directProxyHandler;
+ state = State.ProxyConnectionToBroker;
+ int maxMessageSize =
+ connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
+ ctx.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize))
+ .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+ } else {
+ LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. "
+ + "Closing connection to broker '{}'.",
+ remoteAddress, ctx.channel().isOpen() ? "open" : "already closed",
+ state != State.ProxyConnectingToBroker ? "invalid state " + state : "state " + state,
+ proxyToBrokerUrl);
+ directProxyHandler.close();
+ ctx.close();
+ }
+ }
+
+ private void connectToBroker(InetSocketAddress brokerAddress) {
+ checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop");
+ DirectProxyHandler directProxyHandler = new DirectProxyHandler(service, this);
+ directProxyHandler.connect(proxyToBrokerUrl, brokerAddress,
+ protocolVersionToAdvertise, sslHandlerSupplier);
+ }
+
+ public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
+ try {
+ final CommandConnected finalConnected = connected.toBuilder().build();
+ ctx.executor().submit(() -> handleBrokerConnected(directProxyHandler, finalConnected));
+ } catch (RejectedExecutionException e) {
+ LOG.error("Event loop was already closed. Closing broker connection.", e);
+ directProxyHandler.close();
+ }
+ }
+
// According to auth result, send newConnected or newAuthChallenge command.
private void doAuthentication(AuthData clientData) throws Exception {
AuthData brokerData = authState.authenticate(clientData);