You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/04/29 14:47:02 UTC

[pulsar] branch branch-2.10 updated (ce89b53add4 -> 5da16375cd1)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from ce89b53add4 Add filteredMsgCount for `pulsar-admin topics stats` (#14531)
     new d3c523f3e31 [Proxy & Client] Configure Netty DNS resolver to match JDK DNS caching setting, share DNS resolver instance in Proxy (#15219)
     new fde5b47e02e [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress (#15366)
     new 5da16375cd1 [Broker/Client] Close connection if a ping or pong message cannot be sent (#15382)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/pulsar/client/impl/ConnectionPool.java  |  24 +++-
 pulsar-common/pom.xml                              |   5 +
 .../pulsar/common/protocol/PulsarHandler.java      |  18 ++-
 .../pulsar/common/util/netty/DnsResolverUtil.java  |  75 +++++++++++
 .../pulsar/proxy/server/DirectProxyHandler.java    |  19 +--
 .../pulsar/proxy/server/ProxyConnection.java       | 138 +++++++++++++++------
 .../pulsar/proxy/server/ProxyConnectionPool.java   |  58 ---------
 .../apache/pulsar/proxy/server/ProxyService.java   |   3 +
 .../proxy/server/ServiceChannelInitializer.java    |   2 +-
 9 files changed, 233 insertions(+), 109 deletions(-)
 create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
 delete mode 100644 pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java


[pulsar] 01/03: [Proxy & Client] Configure Netty DNS resolver to match JDK DNS caching setting, share DNS resolver instance in Proxy (#15219)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d3c523f3e31c816d9d19de4a1734de7f2031d2f6
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Apr 20 14:47:25 2022 +0300

    [Proxy & Client] Configure Netty DNS resolver to match JDK DNS caching setting, share DNS resolver instance in Proxy (#15219)
    
    * Align Netty DNS resolver cache settings with Java DNS cache settings
    
    - Netty DNS resolver caches forever by default
      - this could cause problems with Kubernetes deployments
    
    * Share Netty DNSNameResolver in proxy
    
    * Remove overriding of ConnectionPool.close since it's not necessary
    
    * Address review comment: remove ProxyConnectionPool
    
    (cherry picked from commit 39b186269c6ccdfa94412a28cc3719a4338368ac)
---
 .../apache/pulsar/client/impl/ConnectionPool.java  | 24 ++++++-
 pulsar-common/pom.xml                              |  5 ++
 .../pulsar/common/util/netty/DnsResolverUtil.java  | 75 ++++++++++++++++++++++
 .../pulsar/proxy/server/ProxyConnection.java       | 36 ++++++-----
 .../pulsar/proxy/server/ProxyConnectionPool.java   | 58 -----------------
 .../apache/pulsar/proxy/server/ProxyService.java   |  3 +
 .../proxy/server/ServiceChannelInitializer.java    |  2 +-
 7 files changed, 125 insertions(+), 78 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 06b5d24e8b4..edb2a983f25 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -36,6 +36,7 @@ import java.net.URISyntaxException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -47,6 +48,7 @@ import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.netty.DnsResolverUtil;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,13 +64,20 @@ public class ConnectionPool implements AutoCloseable {
     private final boolean isSniProxy;
 
     protected final DnsNameResolver dnsResolver;
+    private final boolean shouldCloseDnsResolver;
 
     public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
         this(conf, eventLoopGroup, () -> new ClientCnx(conf, eventLoopGroup));
     }
 
     public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
-            Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException {
+                          Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException {
+        this(conf, eventLoopGroup, clientCnxSupplier, Optional.empty());
+    }
+
+    public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
+                             Supplier<ClientCnx> clientCnxSupplier, Optional<DnsNameResolver> dnsNameResolver)
+            throws PulsarClientException {
         this.eventLoopGroup = eventLoopGroup;
         this.clientConfig = conf;
         this.maxConnectionsPerHosts = conf.getConnectionsPerBroker();
@@ -91,6 +100,12 @@ public class ConnectionPool implements AutoCloseable {
             log.error("Failed to create channel initializer");
             throw new PulsarClientException(e);
         }
+
+        this.shouldCloseDnsResolver = !dnsNameResolver.isPresent();
+        this.dnsResolver = dnsNameResolver.orElseGet(() -> createDnsNameResolver(conf, eventLoopGroup));
+    }
+
+    private static DnsNameResolver createDnsNameResolver(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
         DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder(eventLoopGroup.next())
                 .traceEnabled(true).channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup));
         if (conf.getDnsLookupBindAddress() != null) {
@@ -98,7 +113,8 @@ public class ConnectionPool implements AutoCloseable {
                     conf.getDnsLookupBindPort());
             dnsNameResolverBuilder.localAddress(addr);
         }
-        this.dnsResolver = dnsNameResolverBuilder.build();
+        DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder);
+        return dnsNameResolverBuilder.build();
     }
 
     private static final Random random = new Random();
@@ -320,7 +336,9 @@ public class ConnectionPool implements AutoCloseable {
     @Override
     public void close() throws Exception {
         closeAllConnections();
-        dnsResolver.close();
+        if (shouldCloseDnsResolver) {
+            dnsResolver.close();
+        }
     }
 
     private void cleanupConnection(InetSocketAddress address, int connectionKey,
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index 3b0fba07aaa..643432392a0 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -72,6 +72,11 @@
       <artifactId>netty-handler</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-resolver-dns</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-transport-native-epoll</artifactId>
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
new file mode 100644
index 00000000000..8b06dbf36ec
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.netty;
+
+import io.netty.resolver.dns.DnsNameResolverBuilder;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class DnsResolverUtil {
+    private static final int MIN_TTL = 0;
+    private static final int TTL;
+    private static final int NEGATIVE_TTL;
+
+    // default TTL value when JDK setting is "forever" (-1)
+    private static final int DEFAULT_TTL = 60;
+
+    // default negative TTL value when JDK setting is "forever" (-1)
+    private static final int DEFAULT_NEGATIVE_TTL = 10;
+
+    static {
+        int ttl = DEFAULT_TTL;
+        int negativeTtl = DEFAULT_NEGATIVE_TTL;
+        try {
+            // use reflection to call sun.net.InetAddressCachePolicy's get and getNegative methods for getting
+            // effective JDK settings for DNS caching
+            Class<?> inetAddressCachePolicyClass = Class.forName("sun.net.InetAddressCachePolicy");
+            Method getTTLMethod = inetAddressCachePolicyClass.getMethod("get");
+            ttl = (Integer) getTTLMethod.invoke(null);
+            Method getNegativeTTLMethod = inetAddressCachePolicyClass.getMethod("getNegative");
+            negativeTtl = (Integer) getNegativeTTLMethod.invoke(null);
+        } catch (NoSuchMethodException | ClassNotFoundException | InvocationTargetException
+                 | IllegalAccessException e) {
+            log.warn("Cannot get DNS TTL settings from sun.net.InetAddressCachePolicy class", e);
+        }
+        TTL = useDefaultTTLWhenSetToForever(ttl, DEFAULT_TTL);
+        NEGATIVE_TTL = useDefaultTTLWhenSetToForever(negativeTtl, DEFAULT_NEGATIVE_TTL);
+    }
+
+    private static int useDefaultTTLWhenSetToForever(int ttl, int defaultTtl) {
+        return ttl < 0 ? defaultTtl : ttl;
+    }
+
+    private DnsResolverUtil() {
+        // utility class with static methods, prevent instantiation
+    }
+
+    /**
+     * Configure Netty's {@link DnsNameResolverBuilder}'s ttl and negativeTtl to match the JDK's DNS caching settings.
+     * If the JDK setting for TTL is forever (-1), the TTL will be set to 60 seconds.
+     *
+     * @param dnsNameResolverBuilder The Netty {@link DnsNameResolverBuilder} instance to apply the settings
+     */
+    public static void applyJdkDnsCacheSettings(DnsNameResolverBuilder dnsNameResolverBuilder) {
+        dnsNameResolverBuilder.ttl(MIN_TTL, TTL);
+        dnsNameResolverBuilder.negativeTtl(NEGATIVE_TTL);
+    }
+}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 58203eee51c..99f8f04ea84 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -25,9 +25,11 @@ import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.haproxy.HAProxyMessage;
 import io.netty.handler.ssl.SslHandler;
+import io.netty.resolver.dns.DnsNameResolver;
 import java.net.SocketAddress;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -71,6 +73,7 @@ public class ProxyConnection extends PulsarHandler {
     private final AtomicLong requestIdGenerator =
             new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2));
     private final ProxyService service;
+    private final DnsNameResolver dnsNameResolver;
     AuthenticationDataSource authenticationData;
     private State state;
     private final Supplier<SslHandler> sslHandlerSupplier;
@@ -119,9 +122,11 @@ public class ProxyConnection extends PulsarHandler {
         return connectionPool;
     }
 
-    public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> sslHandlerSupplier) {
+    public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> sslHandlerSupplier,
+                           DnsNameResolver dnsNameResolver) {
         super(30, TimeUnit.SECONDS);
         this.service = proxyService;
+        this.dnsNameResolver = dnsNameResolver;
         this.state = State.Init;
         this.sslHandlerSupplier = sslHandlerSupplier;
         this.brokerProxyValidator = service.getBrokerProxyValidator();
@@ -229,27 +234,26 @@ public class ProxyConnection extends PulsarHandler {
     }
 
     private synchronized void completeConnect(AuthData clientData) throws PulsarClientException {
+        Supplier<ClientCnx> clientCnxSupplier;
         if (service.getConfiguration().isAuthenticationEnabled()) {
             if (service.getConfiguration().isForwardAuthorizationCredentials()) {
                 this.clientAuthData = clientData;
                 this.clientAuthMethod = authMethod;
             }
-            if (this.connectionPool == null) {
-                this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
-                        () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
-                                clientAuthMethod, protocolVersionToAdvertise));
-            } else {
-                LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}",
-                        remoteAddress, state, clientAuthRole);
-            }
+            clientCnxSupplier =
+                    () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
+                            clientAuthMethod, protocolVersionToAdvertise);
         } else {
-            if (this.connectionPool == null) {
-                this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
-                        () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise));
-            } else {
-                LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {}",
-                        remoteAddress, state);
-            }
+            clientCnxSupplier =
+                    () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise);
+        }
+
+        if (this.connectionPool == null) {
+            this.connectionPool = new ConnectionPool(clientConf, service.getWorkerGroup(),
+                    clientCnxSupplier, Optional.of(dnsNameResolver));
+        } else {
+            LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}",
+                    remoteAddress, state, clientAuthRole);
         }
 
         LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}",
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java
deleted file mode 100644
index 4dcb09570c6..00000000000
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.proxy.server;
-
-import io.netty.channel.EventLoopGroup;
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.function.Supplier;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.ClientCnx;
-import org.apache.pulsar.client.impl.ConnectionPool;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ProxyConnectionPool extends ConnectionPool {
-    public ProxyConnectionPool(ClientConfigurationData clientConfig, EventLoopGroup eventLoopGroup,
-            Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException {
-        super(clientConfig, eventLoopGroup, clientCnxSupplier);
-    }
-
-    @Override
-    public void close() throws IOException {
-        log.info("Closing ProxyConnectionPool.");
-        pool.forEach((address, clientCnxPool) -> {
-            if (clientCnxPool != null) {
-                clientCnxPool.forEach((identifier, clientCnx) -> {
-                    if (clientCnx != null && clientCnx.isDone()) {
-                        try {
-                            clientCnx.get().close();
-                        } catch (InterruptedException | ExecutionException e) {
-                            log.error("Unable to close get client connection future.", e);
-                        }
-                    }
-                });
-            }
-        });
-        dnsResolver.close();
-    }
-
-    private static final Logger log = LoggerFactory.getLogger(ProxyConnectionPool.class);
-}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 6a830657423..10b99aeaff1 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -63,6 +63,7 @@ import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.util.netty.DnsResolverUtil;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -163,6 +164,8 @@ public class ProxyService implements Closeable {
 
         DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder(workerGroup.next())
                 .channelType(EventLoopUtil.getDatagramChannelClass(workerGroup));
+        DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder);
+
         dnsNameResolver = dnsNameResolverBuilder.build();
 
         brokerProxyValidator = new BrokerProxyValidator(dnsNameResolver.asAddressResolver(),
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index e5f15b66d31..1a588b481fc 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -173,7 +173,7 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
         }
 
         ch.pipeline().addLast("handler",
-                new ProxyConnection(proxyService, sslHandlerSupplier));
+                new ProxyConnection(proxyService, sslHandlerSupplier, proxyService.getDnsNameResolver()));
 
     }
 }


[pulsar] 03/03: [Broker/Client] Close connection if a ping or pong message cannot be sent (#15382)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5da16375cd168404fabc4e64b11a1c6e939c9ec1
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Apr 29 17:07:11 2022 +0300

    [Broker/Client] Close connection if a ping or pong message cannot be sent (#15382)
    
    * [Broker/Client] Close connection if a ping message cannot be sent
    
    - the connection should be closed if a ping message cannot be sent
    
    * Handle write errors for pong messages
    
    (cherry picked from commit 2ddef95f31ce37486f3f76b4d59730361a77bf6e)
---
 .../apache/pulsar/common/protocol/PulsarHandler.java   | 18 ++++++++++++++++--
 1 file changed, 16 insertions(+), 2 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
index 42e45cb6528..03fa4c17b08 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
@@ -83,7 +83,14 @@ public abstract class PulsarHandler extends PulsarDecoder {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Replying back to ping message", ctx.channel());
         }
-        ctx.writeAndFlush(Commands.newPong());
+        ctx.writeAndFlush(Commands.newPong())
+                .addListener(future -> {
+                    if (!future.isSuccess()) {
+                        log.warn("[{}] Forcing connection to close since cannot send a pong message.",
+                                ctx.channel(), future.cause());
+                        ctx.close();
+                    }
+                });
     }
 
     @Override
@@ -110,7 +117,14 @@ public abstract class PulsarHandler extends PulsarDecoder {
                 log.debug("[{}] Sending ping message", ctx.channel());
             }
             waitingForPingResponse = true;
-            ctx.writeAndFlush(Commands.newPing());
+            ctx.writeAndFlush(Commands.newPing())
+                    .addListener(future -> {
+                        if (!future.isSuccess()) {
+                            log.warn("[{}] Forcing connection to close since cannot send a ping message.",
+                                    ctx.channel(), future.cause());
+                            ctx.close();
+                        }
+                    });
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Peer doesn't support keep-alive", ctx.channel());


[pulsar] 02/03: [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress (#15366)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fde5b47e02e628e33d745c4fcaa3301afb34a9c9
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Apr 29 14:59:32 2022 +0300

    [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress (#15366)
    
    * [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress
    
    * Add logging when execution is rejected
    
    (cherry picked from commit 4ae070c5b26be3bf92c007ca309bd4adbae5ce93)
---
 .../pulsar/proxy/server/DirectProxyHandler.java    |  19 ++--
 .../pulsar/proxy/server/ProxyConnection.java       | 102 ++++++++++++++++-----
 2 files changed, 92 insertions(+), 29 deletions(-)

diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 0cbceb191b1..a632f0e7372 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -80,9 +80,7 @@ public class DirectProxyHandler {
     private final ProxyService service;
     private final Runnable onHandshakeCompleteAction;
 
-    public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String brokerHostAndPort,
-                              InetSocketAddress targetBrokerAddress, int protocolVersion,
-                              Supplier<SslHandler> sslHandlerSupplier) {
+    public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection) {
         this.service = service;
         this.authentication = proxyConnection.getClientAuthentication();
         this.inboundChannel = proxyConnection.ctx().channel();
@@ -92,6 +90,10 @@ public class DirectProxyHandler {
         this.clientAuthData = proxyConnection.clientAuthData;
         this.clientAuthMethod = proxyConnection.clientAuthMethod;
         this.onHandshakeCompleteAction = proxyConnection::cancelKeepAliveTask;
+    }
+
+    public void connect(String brokerHostAndPort, InetSocketAddress targetBrokerAddress,
+                           int protocolVersion, Supplier<SslHandler> sslHandlerSupplier) {
         ProxyConfiguration config = service.getConfiguration();
 
         // Start the connection attempt.
@@ -208,6 +210,12 @@ public class DirectProxyHandler {
             (byte) 'Y',
     };
 
+    public void close() {
+        if (outboundChannel != null) {
+            outboundChannel.close();
+        }
+    }
+
     enum BackendState {
         Init, HandshakeCompleted
     }
@@ -344,10 +352,7 @@ public class DirectProxyHandler {
             onHandshakeCompleteAction.run();
             startDirectProxying(connected);
 
-            int maxMessageSize =
-                    connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
-            inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize))
-                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+            proxyConnection.brokerConnected(DirectProxyHandler.this, connected);
         }
 
         private void startDirectProxying(CommandConnected connected) {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 99f8f04ea84..33dc1abd88a 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.proxy.server;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler;
@@ -26,10 +27,12 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.haproxy.HAProxyMessage;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.resolver.dns.DnsNameResolver;
+import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -50,6 +53,7 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.api.proto.CommandAuthResponse;
 import org.apache.pulsar.common.api.proto.CommandConnect;
+import org.apache.pulsar.common.api.proto.CommandConnected;
 import org.apache.pulsar.common.api.proto.CommandGetSchema;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.CommandLookupTopic;
@@ -108,6 +112,9 @@ public class ProxyConnection extends PulsarHandler {
         // Follow redirects
         ProxyLookupRequests,
 
+        // Connecting to the broker
+        ProxyConnectingToBroker,
+
         // If we are proxying a connection to a specific broker, we
         // are just forwarding data between the 2 connections, without
         // looking into it
@@ -161,8 +168,8 @@ public class ProxyConnection extends PulsarHandler {
     public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception {
         super.channelInactive(ctx);
 
-        if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
-            directProxyHandler.outboundChannel.close();
+        if (directProxyHandler != null) {
+            directProxyHandler.close();
             directProxyHandler = null;
         }
 
@@ -183,11 +190,22 @@ public class ProxyConnection extends PulsarHandler {
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-        state = State.Closing;
         super.exceptionCaught(ctx, cause);
-        LOG.warn("[{}] Got exception {} : {} {}", remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(),
+        LOG.warn("[{}] Got exception {} : Message: {} State: {}", remoteAddress, cause.getClass().getSimpleName(),
+                cause.getMessage(), state,
                 ClientCnx.isKnownException(cause) ? null : cause);
-        ctx.close();
+        if (state != State.Closed) {
+            state = State.Closing;
+        }
+        if (ctx.channel().isOpen()) {
+            ctx.close();
+        } else {
+            // close connection to broker if that is present
+            if (directProxyHandler != null) {
+                directProxyHandler.close();
+                directProxyHandler = null;
+            }
+        }
     }
 
     @Override
@@ -216,18 +234,26 @@ public class ProxyConnection extends PulsarHandler {
             break;
 
         case ProxyConnectionToBroker:
-            // Pass the buffer to the outbound connection and schedule next read
-            // only if we can write on the connection
-            ProxyService.OPS_COUNTER.inc();
-            if (msg instanceof ByteBuf) {
-                int bytes = ((ByteBuf) msg).readableBytes();
-                directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
-                ProxyService.BYTES_COUNTER.inc(bytes);
+            if (directProxyHandler != null) {
+                ProxyService.OPS_COUNTER.inc();
+                if (msg instanceof ByteBuf) {
+                    int bytes = ((ByteBuf) msg).readableBytes();
+                    directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
+                    ProxyService.BYTES_COUNTER.inc(bytes);
+                }
+                directProxyHandler.outboundChannel.writeAndFlush(msg)
+                        .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+            } else {
+                LOG.warn("Received message of type {} while connection to broker is missing in state {}. "
+                                + "Dropping the input message (readable bytes={}).", msg.getClass(), state,
+                        msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1);
             }
-            directProxyHandler.outboundChannel.writeAndFlush(msg)
-                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
             break;
-
+        case ProxyConnectingToBroker:
+            LOG.warn("Received message of type {} while connecting to broker. "
+                            + "Dropping the input message (readable bytes={}).", msg.getClass(),
+                    msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1);
+            break;
         default:
             break;
         }
@@ -273,14 +299,9 @@ public class ProxyConnection extends PulsarHandler {
                 return;
             }
 
+            state = State.ProxyConnectingToBroker;
             brokerProxyValidator.resolveAndCheckTargetAddress(proxyToBrokerUrl)
-                    .thenAcceptAsync(address -> {
-                        // Client already knows which broker to connect. Let's open a
-                        // connection there and just pass bytes in both directions
-                        state = State.ProxyConnectionToBroker;
-                        directProxyHandler = new DirectProxyHandler(service, this, proxyToBrokerUrl, address,
-                                protocolVersionToAdvertise, sslHandlerSupplier);
-                    }, ctx.executor())
+                    .thenAcceptAsync(this::connectToBroker, ctx.executor())
                     .exceptionally(throwable -> {
                         if (throwable instanceof TargetAddressDeniedException
                                 || throwable.getCause() instanceof TargetAddressDeniedException) {
@@ -313,6 +334,43 @@ public class ProxyConnection extends PulsarHandler {
         }
     }
 
+    private void handleBrokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
+        checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop");
+        if (state == State.ProxyConnectingToBroker && ctx.channel().isOpen() && this.directProxyHandler == null) {
+            this.directProxyHandler = directProxyHandler;
+            state = State.ProxyConnectionToBroker;
+            int maxMessageSize =
+                    connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
+            ctx.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize))
+                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+        } else {
+            LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. "
+                            + "Closing connection to broker '{}'.",
+                    remoteAddress, ctx.channel().isOpen() ? "open" : "already closed",
+                    state != State.ProxyConnectingToBroker ? "invalid state " + state : "state " + state,
+                    proxyToBrokerUrl);
+            directProxyHandler.close();
+            ctx.close();
+        }
+    }
+
+    private void connectToBroker(InetSocketAddress brokerAddress) {
+        checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop");
+        DirectProxyHandler directProxyHandler = new DirectProxyHandler(service, this);
+        directProxyHandler.connect(proxyToBrokerUrl, brokerAddress,
+                protocolVersionToAdvertise, sslHandlerSupplier);
+    }
+
+    public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
+        try {
+            final CommandConnected finalConnected = new CommandConnected().copyFrom(connected);
+            ctx.executor().submit(() -> handleBrokerConnected(directProxyHandler, finalConnected));
+        } catch (RejectedExecutionException e) {
+            LOG.error("Event loop was already closed. Closing broker connection.", e);
+            directProxyHandler.close();
+        }
+    }
+
     // According to auth result, send newConnected or newAuthChallenge command.
     private void doAuthentication(AuthData clientData) throws Exception {
         AuthData brokerData = authState.authenticate(clientData);