You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/02/09 18:32:48 UTC

[pulsar] 01/03: [Proxy] Remove unnecessary Pulsar Client usage from Pulsar Proxy (#13836)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 25e6b65e1e4bc20a1d1fc3c875f73982854954da
Author: Lari Hotari <lh...@apache.org>
AuthorDate: Wed Feb 9 15:34:48 2022 +0200

    [Proxy] Remove unnecessary Pulsar Client usage from Pulsar Proxy (#13836)
    
    (cherry picked from commit 324aa1bf14d89a93b66ed3613a16c118cf9d4c0f)
---
 .../apache/pulsar/client/impl/ConnectionPool.java  |  62 ++++++------
 .../pulsar/proxy/server/ProxyConnection.java       | 105 +++++++++++----------
 .../apache/pulsar/proxy/server/ProxyService.java   |  11 ---
 3 files changed, 81 insertions(+), 97 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 8e28c87..a0ffe5d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -18,11 +18,9 @@
  */
 package org.apache.pulsar.client.impl;
 
-import static org.apache.pulsar.common.util.netty.ChannelFutures.toCompletableFuture;
 import static org.apache.pulsar.client.util.MathUtils.signSafeMod;
-
+import static org.apache.pulsar.common.util.netty.ChannelFutures.toCompletableFuture;
 import com.google.common.annotations.VisibleForTesting;
-
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelException;
@@ -31,9 +29,6 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.resolver.dns.DnsNameResolver;
 import io.netty.resolver.dns.DnsNameResolverBuilder;
 import io.netty.util.concurrent.Future;
-
-import java.io.Closeable;
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -45,9 +40,7 @@ import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
@@ -58,7 +51,7 @@ import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ConnectionPool implements Closeable {
+public class ConnectionPool implements AutoCloseable {
     protected final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;
 
     private final Bootstrap bootstrap;
@@ -222,7 +215,7 @@ public class ConnectionPool implements Closeable {
     }
 
     /**
-     * Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server
+     * Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server.
      */
     private CompletableFuture<Channel> createConnection(InetSocketAddress unresolvedAddress) {
         int port;
@@ -247,27 +240,32 @@ public class ConnectionPool implements Closeable {
     }
 
     /**
-     * Try to connect to a sequence of IP addresses until a successfull connection can be made, or fail if no address is
-     * working
+     * Try to connect to a sequence of IP addresses until a successful connection can be made, or fail if no
+     * address is working.
      */
-    private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses, int port, InetSocketAddress sniHost) {
+    private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses,
+                                                                  int port,
+                                                                  InetSocketAddress sniHost) {
         CompletableFuture<Channel> future = new CompletableFuture<>();
 
         // Successfully connected to server
-        connectToAddress(unresolvedAddresses.next(), port, sniHost).thenAccept(future::complete).exceptionally(exception -> {
-            if (unresolvedAddresses.hasNext()) {
-                // Try next IP address
-                connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(future::complete).exceptionally(ex -> {
-                    // This is already unwinding the recursive call
-                    future.completeExceptionally(ex);
+        connectToAddress(unresolvedAddresses.next(), port, sniHost)
+                .thenAccept(future::complete)
+                .exceptionally(exception -> {
+                    if (unresolvedAddresses.hasNext()) {
+                        // Try next IP address
+                        connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(future::complete)
+                                .exceptionally(ex -> {
+                                    // This is already unwinding the recursive call
+                                    future.completeExceptionally(ex);
+                                    return null;
+                                });
+                    } else {
+                        // Failed to connect to any IP address
+                        future.completeExceptionally(exception);
+                    }
                     return null;
                 });
-            } else {
-                // Failed to connect to any IP address
-                future.completeExceptionally(exception);
-            }
-            return null;
-        });
 
         return future;
     }
@@ -285,7 +283,7 @@ public class ConnectionPool implements Closeable {
     }
 
     /**
-     * Attempt to establish a TCP connection to an already resolved single IP address
+     * Attempt to establish a TCP connection to an already resolved single IP address.
      */
     private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port, InetSocketAddress sniHost) {
         InetSocketAddress remoteAddress = new InetSocketAddress(ipAddress, port);
@@ -303,7 +301,7 @@ public class ConnectionPool implements Closeable {
         if (maxConnectionsPerHosts == 0) {
             //Disable pooling
             if (cnx.channel().isActive()) {
-                if(log.isDebugEnabled()) {
+                if (log.isDebugEnabled()) {
                     log.debug("close connection due to pooling disabled.");
                 }
                 cnx.close();
@@ -312,14 +310,8 @@ public class ConnectionPool implements Closeable {
     }
 
     @Override
-    public void close() throws IOException {
-        try {
-            if (!eventLoopGroup.isShutdown()) {
-                eventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).await();
-            }
-        } catch (InterruptedException e) {
-            log.warn("EventLoopGroup shutdown was interrupted", e);
-        }
+    public void close() throws Exception {
+        closeAllConnections();
         dnsResolver.close();
     }
 
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index f1b7807..ff392ca 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -21,8 +21,9 @@ package org.apache.pulsar.proxy.server;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import java.net.SocketAddress;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
@@ -34,11 +35,9 @@ import org.apache.pulsar.broker.authentication.AuthenticationState;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarChannelInitializer;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.protocol.Commands;
@@ -68,8 +67,9 @@ import lombok.Getter;
  */
 public class ProxyConnection extends PulsarHandler implements FutureListener<Void> {
     // ConnectionPool is used by the proxy to issue lookup requests
-    private PulsarClientImpl client;
     private ConnectionPool connectionPool;
+    private final AtomicLong requestIdGenerator =
+            new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2));
     private ProxyService service;
     AuthenticationDataSource authenticationData;
     private State state;
@@ -113,7 +113,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
     }
 
     ConnectionPool getConnectionPool() {
-        return client.getCnxPool();
+        return connectionPool;
     }
 
     public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> sslHandlerSupplier) {
@@ -130,7 +130,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         if (ProxyService.activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
             ctx.close();
             ProxyService.rejectedConnections.inc();
-            return;
         }
     }
 
@@ -149,26 +148,27 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
     }
 
     @Override
-    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception {
         super.channelInactive(ctx);
 
         if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
             directProxyHandler.outboundChannel.close();
+            directProxyHandler = null;
         }
 
-        if (client != null) {
-            client.close();
-        }
         service.getClientCnxs().remove(this);
         LOG.info("[{}] Connection closed", remoteAddress);
 
         if (connectionPool != null) {
             try {
                 connectionPool.close();
+                connectionPool = null;
             } catch (Exception e) {
                 LOG.error("Failed to close connection pool {}", e.getMessage(), e);
             }
         }
+
+        state = State.Closed;
     }
 
     @Override
@@ -222,7 +222,30 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         }
     }
 
-    private void completeConnect() {
+    private synchronized void completeConnect(AuthData clientData) throws PulsarClientException {
+        if (service.getConfiguration().isAuthenticationEnabled()) {
+            if (service.getConfiguration().isForwardAuthorizationCredentials()) {
+                this.clientAuthData = clientData;
+                this.clientAuthMethod = authMethod;
+            }
+            if (this.connectionPool == null) {
+                this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
+                        () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
+                                clientAuthMethod, protocolVersionToAdvertise));
+            } else {
+                LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}",
+                        remoteAddress, state, clientAuthRole);
+            }
+        } else {
+            if (this.connectionPool == null) {
+                this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
+                        () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise));
+            } else {
+                LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {}",
+                        remoteAddress, state);
+            }
+        }
+
         LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}",
             remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl);
         if (hasProxyToBrokerUrl) {
@@ -242,17 +265,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         }
     }
 
-    private void createClientAndCompleteConnect(AuthData clientData)
-        throws PulsarClientException {
-        if (service.getConfiguration().isForwardAuthorizationCredentials()) {
-            this.clientAuthData = clientData;
-            this.clientAuthMethod = authMethod;
-        }
-        this.client = createClient(clientConf, this.clientAuthData, this.clientAuthMethod, protocolVersionToAdvertise);
-
-        completeConnect();
-    }
-
     // According to auth result, send newConnected or newAuthChallenge command.
     private void doAuthentication(AuthData clientData) throws Exception {
         AuthData brokerData = authState.authenticate(clientData);
@@ -263,7 +275,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
                 LOG.debug("[{}] Client successfully authenticated with {} role {}",
                     remoteAddress, authMethod, clientAuthRole);
             }
-            createClientAndCompleteConnect(clientData);
+            completeConnect(clientData);
             return;
         }
 
@@ -274,7 +286,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
                 remoteAddress, authMethod);
         }
         state = State.Connecting;
-        return;
     }
 
     @Override
@@ -302,16 +313,10 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         try {
             // init authn
             this.clientConf = createClientConfiguration();
-            int protocolVersion = getProtocolVersionToAdvertise(connect);
 
             // authn not enabled, complete
             if (!service.getConfiguration().isAuthenticationEnabled()) {
-                this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
-                        () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion));
-                this.client =
-                        new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer());
-
-                completeConnect();
+                completeConnect(null);
                 return;
             }
 
@@ -336,7 +341,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
                     .orElseThrow(() ->
                         new AuthenticationException("No anonymous role, and no authentication provider configured"));
 
-                createClientAndCompleteConnect(clientData);
+                completeConnect(clientData);
                 return;
             }
 
@@ -354,7 +359,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
             LOG.warn("[{}] Unable to authenticate: ", remoteAddress, e);
             ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate"));
             close();
-            return;
         }
     }
 
@@ -409,19 +413,26 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         lookupProxyHandler.handleLookup(lookup);
     }
 
-    private void close() {
-        state = State.Closed;
-        ctx.close();
-        try {
-            if (client != null) {
-                client.close();
+    private synchronized void close() {
+        if (state != State.Closed) {
+            state = State.Closed;
+            if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
+                directProxyHandler.outboundChannel.close();
+                directProxyHandler = null;
+            }
+            if (connectionPool != null) {
+                try {
+                    connectionPool.close();
+                    connectionPool = null;
+                } catch (Exception e) {
+                    LOG.error("Error closing connection pool", e);
+                }
             }
-        } catch (PulsarClientException e) {
-            LOG.error("Unable to close pulsar client - {}. Error - {}", client, e.getMessage());
+            ctx.close();
         }
     }
 
-    ClientConfigurationData createClientConfiguration() throws UnsupportedAuthenticationException {
+    ClientConfigurationData createClientConfiguration() {
         ClientConfigurationData clientConf = new ClientConfigurationData();
         clientConf.setServiceUrl(service.getServiceUrl());
         ProxyConfiguration proxyConfig = service.getConfiguration();
@@ -441,20 +452,12 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         return clientConf;
     }
 
-    private PulsarClientImpl createClient(final ClientConfigurationData clientConf, final AuthData clientAuthData,
-            final String clientAuthMethod, final int protocolVersion) throws PulsarClientException {
-        this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
-                () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
-                        clientAuthMethod, protocolVersion));
-        return new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer());
-    }
-
     private static int getProtocolVersionToAdvertise(CommandConnect connect) {
         return Math.min(connect.getProtocolVersion(), Commands.getCurrentProtocolVersion());
     }
 
     long newRequestId() {
-        return client.newRequestId();
+        return requestIdGenerator.getAndIncrement();
     }
 
     public Authentication getClientAuthentication() {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index abc7a5f..c5e04b6 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -27,8 +27,6 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
-import io.netty.util.HashedWheelTimer;
-import io.netty.util.Timer;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Gauge;
 import lombok.Getter;
@@ -75,7 +73,6 @@ public class ProxyService implements Closeable {
 
     private final ProxyConfiguration proxyConfig;
     private final Authentication proxyClientAuthentication;
-    private final Timer timer;
     private String serviceUrl;
     private String serviceUrlTls;
     private ConfigurationMetadataCacheService configurationCacheService;
@@ -135,7 +132,6 @@ public class ProxyService implements Closeable {
                         AuthenticationService authenticationService) throws IOException {
         checkNotNull(proxyConfig);
         this.proxyConfig = proxyConfig;
-        this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()), 1, TimeUnit.MILLISECONDS);
         this.clientCnxs = Sets.newConcurrentHashSet();
         this.topicStats = Maps.newConcurrentMap();
 
@@ -275,9 +271,6 @@ public class ProxyService implements Closeable {
         }
         acceptorGroup.shutdownGracefully();
         workerGroup.shutdownGracefully();
-        if (timer != null) {
-            timer.stop();
-        }
     }
 
     public String getServiceUrl() {
@@ -292,10 +285,6 @@ public class ProxyService implements Closeable {
         return proxyConfig;
     }
 
-    public Timer getTimer() {
-        return timer;
-    }
-
     public AuthenticationService getAuthenticationService() {
         return authenticationService;
     }