You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/04/28 08:46:47 UTC

[GitHub] [pulsar] lhotari opened a new pull request, #15366: [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress

lhotari opened a new pull request, #15366:
URL: https://github.com/apache/pulsar/pull/15366

   ### Motivation
   
   There's a remaining proxy connection leak which was introduced by the changes #14078 made to fix another connection leak .
   
   ### Modifications
   
   Use a better state model to prevent leaks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #15366: [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #15366:
URL: https://github.com/apache/pulsar/pull/15366#discussion_r861129571


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -216,18 +234,26 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce
             break;
 
         case ProxyConnectionToBroker:
-            // Pass the buffer to the outbound connection and schedule next read
-            // only if we can write on the connection
-            ProxyService.OPS_COUNTER.inc();
-            if (msg instanceof ByteBuf) {
-                int bytes = ((ByteBuf) msg).readableBytes();
-                directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
-                ProxyService.BYTES_COUNTER.inc(bytes);
+            if (directProxyHandler != null) {
+                ProxyService.OPS_COUNTER.inc();
+                if (msg instanceof ByteBuf) {
+                    int bytes = ((ByteBuf) msg).readableBytes();
+                    directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
+                    ProxyService.BYTES_COUNTER.inc(bytes);
+                }
+                directProxyHandler.outboundChannel.writeAndFlush(msg)
+                        .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+            } else {
+                LOG.warn("Received message of type {} while connection to broker is missing in state {}. "
+                                + "Dropping the input message (readable bytes={}).", msg.getClass(), state,
+                        msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1);
             }
-            directProxyHandler.outboundChannel.writeAndFlush(msg)
-                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
             break;
-
+        case ProxyConnectingToBroker:
+            LOG.warn("Received message of type {} while connecting to broker. "
+                            + "Dropping the input message (readable bytes={}).", msg.getClass(),
+                    msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1);

Review Comment:
   Disconnecting might be the correct solution for handling misbehaving clients eventually.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #15366: [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #15366:
URL: https://github.com/apache/pulsar/pull/15366#discussion_r861120267


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -313,6 +334,43 @@ private synchronized void completeConnect(AuthData clientData) throws PulsarClie
         }
     }
 
+    private void handleBrokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
+        checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop");
+        if (state == State.ProxyConnectingToBroker && ctx.channel().isOpen() && this.directProxyHandler == null) {
+            this.directProxyHandler = directProxyHandler;
+            state = State.ProxyConnectionToBroker;
+            int maxMessageSize =
+                    connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
+            ctx.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize))
+                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+        } else {
+            LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. "
+                            + "Closing connection to broker '{}'.",
+                    remoteAddress, ctx.channel().isOpen() ? "open" : "already closed",
+                    state != State.ProxyConnectingToBroker ? "invalid state " + state : "state " + state,
+                    proxyToBrokerUrl);
+            directProxyHandler.close();

Review Comment:
   Thanks, I missed that nuance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #15366: [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #15366:
URL: https://github.com/apache/pulsar/pull/15366#discussion_r860800580


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -313,6 +334,42 @@ private synchronized void completeConnect(AuthData clientData) throws PulsarClie
         }
     }
 
+    private void handleBrokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
+        checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop");
+        if (state == State.ProxyConnectingToBroker && ctx.channel().isOpen() && this.directProxyHandler == null) {
+            this.directProxyHandler = directProxyHandler;
+            state = State.ProxyConnectionToBroker;
+            int maxMessageSize =
+                    connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
+            ctx.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize))
+                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+        } else {
+            LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. "
+                            + "Closing connection to broker '{}'.",
+                    remoteAddress, ctx.channel().isOpen() ? "open" : "already closed",
+                    state != State.ProxyConnectingToBroker ? "invalid state " + state : "state " + state,
+                    proxyToBrokerUrl);
+            directProxyHandler.close();
+            ctx.close();
+        }
+    }
+
+    private void connectToBroker(InetSocketAddress brokerAddress) {
+        checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop");
+        DirectProxyHandler directProxyHandler = new DirectProxyHandler(service, this);
+        directProxyHandler.connect(proxyToBrokerUrl, brokerAddress,
+                protocolVersionToAdvertise, sslHandlerSupplier);
+    }
+
+    public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
+        try {
+            final CommandConnected finalConnected = new CommandConnected().copyFrom(connected);
+            ctx.executor().submit(() -> handleBrokerConnected(directProxyHandler, finalConnected));
+        } catch (RejectedExecutionException e) {
+            directProxyHandler.close();

Review Comment:
   yes, I'll do that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #15366: [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #15366:
URL: https://github.com/apache/pulsar/pull/15366#discussion_r861030068


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -216,18 +234,26 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce
             break;
 
         case ProxyConnectionToBroker:
-            // Pass the buffer to the outbound connection and schedule next read
-            // only if we can write on the connection
-            ProxyService.OPS_COUNTER.inc();
-            if (msg instanceof ByteBuf) {
-                int bytes = ((ByteBuf) msg).readableBytes();
-                directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
-                ProxyService.BYTES_COUNTER.inc(bytes);
+            if (directProxyHandler != null) {
+                ProxyService.OPS_COUNTER.inc();
+                if (msg instanceof ByteBuf) {
+                    int bytes = ((ByteBuf) msg).readableBytes();
+                    directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
+                    ProxyService.BYTES_COUNTER.inc(bytes);
+                }
+                directProxyHandler.outboundChannel.writeAndFlush(msg)

Review Comment:
   Actually there were dependencies to outboundChannel from ProxyStats too. I think it's better to do that refactoring in a separate PR since it results in quite a few changes. I did an experiment with https://github.com/lhotari/pulsar/commit/7d843d1a2755be26068fd561d64677e9c68c869f , but that doesn't include all the required changes. There would be the ProxyStats class to handle too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #15366: [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #15366:
URL: https://github.com/apache/pulsar/pull/15366#discussion_r861113664


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -216,18 +234,26 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce
             break;
 
         case ProxyConnectionToBroker:
-            // Pass the buffer to the outbound connection and schedule next read
-            // only if we can write on the connection
-            ProxyService.OPS_COUNTER.inc();
-            if (msg instanceof ByteBuf) {
-                int bytes = ((ByteBuf) msg).readableBytes();
-                directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
-                ProxyService.BYTES_COUNTER.inc(bytes);
+            if (directProxyHandler != null) {
+                ProxyService.OPS_COUNTER.inc();
+                if (msg instanceof ByteBuf) {
+                    int bytes = ((ByteBuf) msg).readableBytes();
+                    directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
+                    ProxyService.BYTES_COUNTER.inc(bytes);
+                }
+                directProxyHandler.outboundChannel.writeAndFlush(msg)
+                        .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+            } else {
+                LOG.warn("Received message of type {} while connection to broker is missing in state {}. "
+                                + "Dropping the input message (readable bytes={}).", msg.getClass(), state,
+                        msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1);
             }
-            directProxyHandler.outboundChannel.writeAndFlush(msg)
-                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
             break;
-
+        case ProxyConnectingToBroker:
+            LOG.warn("Received message of type {} while connecting to broker. "
+                            + "Dropping the input message (readable bytes={}).", msg.getClass(),
+                    msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1);

Review Comment:
   I'm assuming that the client is misbehaving if it's sending messages before it receives a response. Pulsar binary protocol spec about connecting: https://pulsar.apache.org/docs/en/develop-binary-protocol/#connection-establishment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #15366: [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #15366:
URL: https://github.com/apache/pulsar/pull/15366#discussion_r860797741


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -183,11 +190,22 @@ public synchronized void channelInactive(ChannelHandlerContext ctx) throws Excep
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-        state = State.Closing;
         super.exceptionCaught(ctx, cause);
-        LOG.warn("[{}] Got exception {} : {} {}", remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(),
+        LOG.warn("[{}] Got exception {} : Message: {} State: {}", remoteAddress, cause.getClass().getSimpleName(),
+                cause.getMessage(), state,
                 ClientCnx.isKnownException(cause) ? null : cause);
-        ctx.close();
+        if (state != State.Closed) {
+            state = State.Closing;
+        }
+        if (ctx.channel().isOpen()) {

Review Comment:
   it could be called, but it it's already closed, it might not close the directProxyHandler (if that happens to be present). The else branch is the reason for adding this logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #15366: [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #15366:
URL: https://github.com/apache/pulsar/pull/15366#discussion_r861128798


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -216,18 +234,26 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce
             break;
 
         case ProxyConnectionToBroker:
-            // Pass the buffer to the outbound connection and schedule next read
-            // only if we can write on the connection
-            ProxyService.OPS_COUNTER.inc();
-            if (msg instanceof ByteBuf) {
-                int bytes = ((ByteBuf) msg).readableBytes();
-                directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
-                ProxyService.BYTES_COUNTER.inc(bytes);
+            if (directProxyHandler != null) {
+                ProxyService.OPS_COUNTER.inc();
+                if (msg instanceof ByteBuf) {
+                    int bytes = ((ByteBuf) msg).readableBytes();
+                    directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
+                    ProxyService.BYTES_COUNTER.inc(bytes);
+                }
+                directProxyHandler.outboundChannel.writeAndFlush(msg)
+                        .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+            } else {
+                LOG.warn("Received message of type {} while connection to broker is missing in state {}. "
+                                + "Dropping the input message (readable bytes={}).", msg.getClass(), state,
+                        msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1);
             }
-            directProxyHandler.outboundChannel.writeAndFlush(msg)
-                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
             break;
-
+        case ProxyConnectingToBroker:
+            LOG.warn("Received message of type {} while connecting to broker. "
+                            + "Dropping the input message (readable bytes={}).", msg.getClass(),
+                    msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1);

Review Comment:
   In this case it's better to see if this warning gets ever logged. There's a possibility that some older Go client versions misbehave in some cases and disconnecting them would break the connectivity completely. Logging will reveal if that happens.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #15366: [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #15366:
URL: https://github.com/apache/pulsar/pull/15366#discussion_r861134219


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -216,18 +234,26 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce
             break;
 
         case ProxyConnectionToBroker:
-            // Pass the buffer to the outbound connection and schedule next read
-            // only if we can write on the connection
-            ProxyService.OPS_COUNTER.inc();
-            if (msg instanceof ByteBuf) {
-                int bytes = ((ByteBuf) msg).readableBytes();
-                directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
-                ProxyService.BYTES_COUNTER.inc(bytes);
+            if (directProxyHandler != null) {
+                ProxyService.OPS_COUNTER.inc();
+                if (msg instanceof ByteBuf) {
+                    int bytes = ((ByteBuf) msg).readableBytes();
+                    directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
+                    ProxyService.BYTES_COUNTER.inc(bytes);
+                }
+                directProxyHandler.outboundChannel.writeAndFlush(msg)
+                        .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+            } else {
+                LOG.warn("Received message of type {} while connection to broker is missing in state {}. "
+                                + "Dropping the input message (readable bytes={}).", msg.getClass(), state,
+                        msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1);
             }
-            directProxyHandler.outboundChannel.writeAndFlush(msg)
-                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
             break;
-
+        case ProxyConnectingToBroker:
+            LOG.warn("Received message of type {} while connecting to broker. "
+                            + "Dropping the input message (readable bytes={}).", msg.getClass(),
+                    msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1);

Review Comment:
   Okay. If we find that the go client is misbehaving, it'd be ideal to get a minor version released of the go client fixing that issue (I don't believe we currently do that for the go client, but implementing the protocol correctly is essential).
   
   Also, I realize now that this kind of misbehaving is a bit different than the one we found in the broker. That one had to do with sending messages before the producer's creation finished. In this case, the dropped protocol message would be a create producer or consumer message, which is not as bad.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #15366: [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #15366:
URL: https://github.com/apache/pulsar/pull/15366#discussion_r860815319


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -183,11 +190,22 @@ public synchronized void channelInactive(ChannelHandlerContext ctx) throws Excep
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-        state = State.Closing;
         super.exceptionCaught(ctx, cause);
-        LOG.warn("[{}] Got exception {} : {} {}", remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(),
+        LOG.warn("[{}] Got exception {} : Message: {} State: {}", remoteAddress, cause.getClass().getSimpleName(),
+                cause.getMessage(), state,
                 ClientCnx.isKnownException(cause) ? null : cause);
-        ctx.close();
+        if (state != State.Closed) {
+            state = State.Closing;
+        }
+        if (ctx.channel().isOpen()) {
+            ctx.close();
+        } else {
+            // close connection to broker if that is present
+            if (directProxyHandler != null) {

Review Comment:
   `ctx.close()` is asynchronous and is supposed to take care of closing the directProxyHandler. That will only happen if channel isn't yet close. This method executes in the context of the event loop for the channel so there's no risk of races.
   directProxyHandler.close will also trigger closing of the client-to-proxy connection, but that happens only when directProxyHandler is present and isn't yet closed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #15366: [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #15366:
URL: https://github.com/apache/pulsar/pull/15366#discussion_r861109478


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -313,6 +334,43 @@ private synchronized void completeConnect(AuthData clientData) throws PulsarClie
         }
     }
 
+    private void handleBrokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
+        checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop");
+        if (state == State.ProxyConnectingToBroker && ctx.channel().isOpen() && this.directProxyHandler == null) {
+            this.directProxyHandler = directProxyHandler;
+            state = State.ProxyConnectionToBroker;
+            int maxMessageSize =
+                    connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
+            ctx.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize))
+                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+        } else {
+            LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. "
+                            + "Closing connection to broker '{}'.",
+                    remoteAddress, ctx.channel().isOpen() ? "open" : "already closed",
+                    state != State.ProxyConnectingToBroker ? "invalid state " + state : "state " + state,
+                    proxyToBrokerUrl);
+            directProxyHandler.close();

Review Comment:
   What is the purpose of closing this here? I'd expect the `channelInactive` method to close it. Is it an optimization to close the channel as early as possible? If so, should we set it to `null`?



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -216,18 +234,26 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce
             break;
 
         case ProxyConnectionToBroker:
-            // Pass the buffer to the outbound connection and schedule next read
-            // only if we can write on the connection
-            ProxyService.OPS_COUNTER.inc();
-            if (msg instanceof ByteBuf) {
-                int bytes = ((ByteBuf) msg).readableBytes();
-                directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
-                ProxyService.BYTES_COUNTER.inc(bytes);
+            if (directProxyHandler != null) {
+                ProxyService.OPS_COUNTER.inc();
+                if (msg instanceof ByteBuf) {
+                    int bytes = ((ByteBuf) msg).readableBytes();
+                    directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
+                    ProxyService.BYTES_COUNTER.inc(bytes);
+                }
+                directProxyHandler.outboundChannel.writeAndFlush(msg)
+                        .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+            } else {
+                LOG.warn("Received message of type {} while connection to broker is missing in state {}. "
+                                + "Dropping the input message (readable bytes={}).", msg.getClass(), state,
+                        msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1);
             }
-            directProxyHandler.outboundChannel.writeAndFlush(msg)
-                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
             break;
-
+        case ProxyConnectingToBroker:
+            LOG.warn("Received message of type {} while connecting to broker. "
+                            + "Dropping the input message (readable bytes={}).", msg.getClass(),
+                    msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1);

Review Comment:
   Depending on the type of protocol message, dropping it could be problematic. For example, if the protocol message is a delivered message, it could lead to out of order persistence, as we saw in the broker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #15366: [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #15366:
URL: https://github.com/apache/pulsar/pull/15366#discussion_r861093797


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -313,6 +334,42 @@ private synchronized void completeConnect(AuthData clientData) throws PulsarClie
         }
     }
 
+    private void handleBrokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
+        checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop");
+        if (state == State.ProxyConnectingToBroker && ctx.channel().isOpen() && this.directProxyHandler == null) {
+            this.directProxyHandler = directProxyHandler;
+            state = State.ProxyConnectionToBroker;
+            int maxMessageSize =
+                    connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
+            ctx.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize))
+                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+        } else {
+            LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. "
+                            + "Closing connection to broker '{}'.",
+                    remoteAddress, ctx.channel().isOpen() ? "open" : "already closed",
+                    state != State.ProxyConnectingToBroker ? "invalid state " + state : "state " + state,
+                    proxyToBrokerUrl);
+            directProxyHandler.close();
+            ctx.close();
+        }
+    }
+
+    private void connectToBroker(InetSocketAddress brokerAddress) {
+        checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop");
+        DirectProxyHandler directProxyHandler = new DirectProxyHandler(service, this);
+        directProxyHandler.connect(proxyToBrokerUrl, brokerAddress,
+                protocolVersionToAdvertise, sslHandlerSupplier);
+    }
+
+    public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
+        try {
+            final CommandConnected finalConnected = new CommandConnected().copyFrom(connected);
+            ctx.executor().submit(() -> handleBrokerConnected(directProxyHandler, finalConnected));
+        } catch (RejectedExecutionException e) {
+            directProxyHandler.close();

Review Comment:
   logging added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari merged pull request #15366: [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress

Posted by GitBox <gi...@apache.org>.
lhotari merged PR #15366:
URL: https://github.com/apache/pulsar/pull/15366


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #15366: [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #15366:
URL: https://github.com/apache/pulsar/pull/15366#discussion_r860800142


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -216,18 +234,26 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce
             break;
 
         case ProxyConnectionToBroker:
-            // Pass the buffer to the outbound connection and schedule next read
-            // only if we can write on the connection
-            ProxyService.OPS_COUNTER.inc();
-            if (msg instanceof ByteBuf) {
-                int bytes = ((ByteBuf) msg).readableBytes();
-                directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
-                ProxyService.BYTES_COUNTER.inc(bytes);
+            if (directProxyHandler != null) {
+                ProxyService.OPS_COUNTER.inc();
+                if (msg instanceof ByteBuf) {
+                    int bytes = ((ByteBuf) msg).readableBytes();
+                    directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
+                    ProxyService.BYTES_COUNTER.inc(bytes);
+                }
+                directProxyHandler.outboundChannel.writeAndFlush(msg)

Review Comment:
   yes, that would make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #15366: [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #15366:
URL: https://github.com/apache/pulsar/pull/15366#discussion_r861098843


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -183,11 +190,22 @@ public synchronized void channelInactive(ChannelHandlerContext ctx) throws Excep
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-        state = State.Closing;
         super.exceptionCaught(ctx, cause);
-        LOG.warn("[{}] Got exception {} : {} {}", remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(),
+        LOG.warn("[{}] Got exception {} : Message: {} State: {}", remoteAddress, cause.getClass().getSimpleName(),
+                cause.getMessage(), state,
                 ClientCnx.isKnownException(cause) ? null : cause);
-        ctx.close();
+        if (state != State.Closed) {
+            state = State.Closing;
+        }
+        if (ctx.channel().isOpen()) {

Review Comment:
   I see that the `directProxyHandler` might not be closed yet, but wouldn't we expect it to be closed soon, so we don't need to close it in this method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #15366: [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #15366:
URL: https://github.com/apache/pulsar/pull/15366#discussion_r861116060


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -313,6 +334,43 @@ private synchronized void completeConnect(AuthData clientData) throws PulsarClie
         }
     }
 
+    private void handleBrokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
+        checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop");
+        if (state == State.ProxyConnectingToBroker && ctx.channel().isOpen() && this.directProxyHandler == null) {
+            this.directProxyHandler = directProxyHandler;
+            state = State.ProxyConnectionToBroker;
+            int maxMessageSize =
+                    connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
+            ctx.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize))
+                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+        } else {
+            LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. "
+                            + "Closing connection to broker '{}'.",
+                    remoteAddress, ctx.channel().isOpen() ? "open" : "already closed",
+                    state != State.ProxyConnectingToBroker ? "invalid state " + state : "state " + state,
+                    proxyToBrokerUrl);
+            directProxyHandler.close();

Review Comment:
   `directProxyHandler` is the method parameter in this case. it's not set to any field at this point. The instance will only be set to the field if the state is what is expected (`state == State.ProxyConnectingToBroker && ctx.channel().isOpen() && this.directProxyHandler == null`). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #15366: [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #15366:
URL: https://github.com/apache/pulsar/pull/15366#discussion_r861122916


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -216,18 +234,26 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce
             break;
 
         case ProxyConnectionToBroker:
-            // Pass the buffer to the outbound connection and schedule next read
-            // only if we can write on the connection
-            ProxyService.OPS_COUNTER.inc();
-            if (msg instanceof ByteBuf) {
-                int bytes = ((ByteBuf) msg).readableBytes();
-                directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
-                ProxyService.BYTES_COUNTER.inc(bytes);
+            if (directProxyHandler != null) {
+                ProxyService.OPS_COUNTER.inc();
+                if (msg instanceof ByteBuf) {
+                    int bytes = ((ByteBuf) msg).readableBytes();
+                    directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
+                    ProxyService.BYTES_COUNTER.inc(bytes);
+                }
+                directProxyHandler.outboundChannel.writeAndFlush(msg)
+                        .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+            } else {
+                LOG.warn("Received message of type {} while connection to broker is missing in state {}. "
+                                + "Dropping the input message (readable bytes={}).", msg.getClass(), state,
+                        msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1);
             }
-            directProxyHandler.outboundChannel.writeAndFlush(msg)
-                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
             break;
-
+        case ProxyConnectingToBroker:
+            LOG.warn("Received message of type {} while connecting to broker. "
+                            + "Dropping the input message (readable bytes={}).", msg.getClass(),
+                    msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1);

Review Comment:
   Yes, it would be misbehaving in that case. In the broker, we close the connection if a `send` message is sent before the producer is created successfully because otherwise, messages could be persisted out of order.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #15366: [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #15366:
URL: https://github.com/apache/pulsar/pull/15366#discussion_r861118679


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -313,6 +334,43 @@ private synchronized void completeConnect(AuthData clientData) throws PulsarClie
         }
     }
 
+    private void handleBrokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
+        checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop");
+        if (state == State.ProxyConnectingToBroker && ctx.channel().isOpen() && this.directProxyHandler == null) {
+            this.directProxyHandler = directProxyHandler;
+            state = State.ProxyConnectionToBroker;
+            int maxMessageSize =
+                    connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
+            ctx.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize))
+                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+        } else {
+            LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. "
+                            + "Closing connection to broker '{}'.",
+                    remoteAddress, ctx.channel().isOpen() ? "open" : "already closed",
+                    state != State.ProxyConnectingToBroker ? "invalid state " + state : "state " + state,
+                    proxyToBrokerUrl);
+            directProxyHandler.close();

Review Comment:
   the else branch would get executed in the case if the incoming connection gets closed while the broker connection is being created. That's the race that this PR intends to fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #15366: [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #15366:
URL: https://github.com/apache/pulsar/pull/15366#discussion_r861097477


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -183,11 +190,22 @@ public synchronized void channelInactive(ChannelHandlerContext ctx) throws Excep
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-        state = State.Closing;
         super.exceptionCaught(ctx, cause);
-        LOG.warn("[{}] Got exception {} : {} {}", remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(),
+        LOG.warn("[{}] Got exception {} : Message: {} State: {}", remoteAddress, cause.getClass().getSimpleName(),
+                cause.getMessage(), state,
                 ClientCnx.isKnownException(cause) ? null : cause);
-        ctx.close();
+        if (state != State.Closed) {
+            state = State.Closing;
+        }
+        if (ctx.channel().isOpen()) {

Review Comment:
   If it is already closed, why wouldn't `directProxyHandler` also be closed? Isn't the `channelInactive` method always called when the channel is closed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #15366: [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #15366:
URL: https://github.com/apache/pulsar/pull/15366#discussion_r860792340


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -183,11 +190,22 @@ public synchronized void channelInactive(ChannelHandlerContext ctx) throws Excep
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-        state = State.Closing;
         super.exceptionCaught(ctx, cause);
-        LOG.warn("[{}] Got exception {} : {} {}", remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(),
+        LOG.warn("[{}] Got exception {} : Message: {} State: {}", remoteAddress, cause.getClass().getSimpleName(),
+                cause.getMessage(), state,
                 ClientCnx.isKnownException(cause) ? null : cause);
-        ctx.close();
+        if (state != State.Closed) {
+            state = State.Closing;
+        }
+        if (ctx.channel().isOpen()) {

Review Comment:
   why can't we blindly call "ctx.close()" ?



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -183,11 +190,22 @@ public synchronized void channelInactive(ChannelHandlerContext ctx) throws Excep
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-        state = State.Closing;
         super.exceptionCaught(ctx, cause);
-        LOG.warn("[{}] Got exception {} : {} {}", remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(),
+        LOG.warn("[{}] Got exception {} : Message: {} State: {}", remoteAddress, cause.getClass().getSimpleName(),
+                cause.getMessage(), state,
                 ClientCnx.isKnownException(cause) ? null : cause);
-        ctx.close();
+        if (state != State.Closed) {
+            state = State.Closing;
+        }
+        if (ctx.channel().isOpen()) {
+            ctx.close();
+        } else {
+            // close connection to broker if that is present
+            if (directProxyHandler != null) {

Review Comment:
   why can't we do this every time ?



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -216,18 +234,26 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce
             break;
 
         case ProxyConnectionToBroker:
-            // Pass the buffer to the outbound connection and schedule next read
-            // only if we can write on the connection
-            ProxyService.OPS_COUNTER.inc();
-            if (msg instanceof ByteBuf) {
-                int bytes = ((ByteBuf) msg).readableBytes();
-                directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
-                ProxyService.BYTES_COUNTER.inc(bytes);
+            if (directProxyHandler != null) {
+                ProxyService.OPS_COUNTER.inc();
+                if (msg instanceof ByteBuf) {
+                    int bytes = ((ByteBuf) msg).readableBytes();
+                    directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
+                    ProxyService.BYTES_COUNTER.inc(bytes);
+                }
+                directProxyHandler.outboundChannel.writeAndFlush(msg)

Review Comment:
   it looks like we are hiding "directProxyHandler.outboundChannel" in close(),
   what about creating a method 
   `directProxyHandler.writeOutboundAndFlush(msg)` ?  (not sure about the name)
   
   this way we can make `outboundChannel` private



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -313,6 +334,42 @@ private synchronized void completeConnect(AuthData clientData) throws PulsarClie
         }
     }
 
+    private void handleBrokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
+        checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop");
+        if (state == State.ProxyConnectingToBroker && ctx.channel().isOpen() && this.directProxyHandler == null) {
+            this.directProxyHandler = directProxyHandler;
+            state = State.ProxyConnectionToBroker;
+            int maxMessageSize =
+                    connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
+            ctx.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize))
+                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+        } else {
+            LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. "
+                            + "Closing connection to broker '{}'.",
+                    remoteAddress, ctx.channel().isOpen() ? "open" : "already closed",
+                    state != State.ProxyConnectingToBroker ? "invalid state " + state : "state " + state,
+                    proxyToBrokerUrl);
+            directProxyHandler.close();
+            ctx.close();
+        }
+    }
+
+    private void connectToBroker(InetSocketAddress brokerAddress) {
+        checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop");
+        DirectProxyHandler directProxyHandler = new DirectProxyHandler(service, this);
+        directProxyHandler.connect(proxyToBrokerUrl, brokerAddress,
+                protocolVersionToAdvertise, sslHandlerSupplier);
+    }
+
+    public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
+        try {
+            final CommandConnected finalConnected = new CommandConnected().copyFrom(connected);
+            ctx.executor().submit(() -> handleBrokerConnected(directProxyHandler, finalConnected));
+        } catch (RejectedExecutionException e) {
+            directProxyHandler.close();

Review Comment:
   is it probably better to log this case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org