You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/02/09 18:32:48 UTC
[pulsar] 01/03: [Proxy] Remove unnecessary Pulsar Client usage from Pulsar Proxy (#13836)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 25e6b65e1e4bc20a1d1fc3c875f73982854954da
Author: Lari Hotari <lh...@apache.org>
AuthorDate: Wed Feb 9 15:34:48 2022 +0200
[Proxy] Remove unnecessary Pulsar Client usage from Pulsar Proxy (#13836)
(cherry picked from commit 324aa1bf14d89a93b66ed3613a16c118cf9d4c0f)
---
.../apache/pulsar/client/impl/ConnectionPool.java | 62 ++++++------
.../pulsar/proxy/server/ProxyConnection.java | 105 +++++++++++----------
.../apache/pulsar/proxy/server/ProxyService.java | 11 ---
3 files changed, 81 insertions(+), 97 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 8e28c87..a0ffe5d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -18,11 +18,9 @@
*/
package org.apache.pulsar.client.impl;
-import static org.apache.pulsar.common.util.netty.ChannelFutures.toCompletableFuture;
import static org.apache.pulsar.client.util.MathUtils.signSafeMod;
-
+import static org.apache.pulsar.common.util.netty.ChannelFutures.toCompletableFuture;
import com.google.common.annotations.VisibleForTesting;
-
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
@@ -31,9 +29,6 @@ import io.netty.channel.EventLoopGroup;
import io.netty.resolver.dns.DnsNameResolver;
import io.netty.resolver.dns.DnsNameResolverBuilder;
import io.netty.util.concurrent.Future;
-
-import java.io.Closeable;
-import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
@@ -45,9 +40,7 @@ import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
@@ -58,7 +51,7 @@ import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ConnectionPool implements Closeable {
+public class ConnectionPool implements AutoCloseable {
protected final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;
private final Bootstrap bootstrap;
@@ -222,7 +215,7 @@ public class ConnectionPool implements Closeable {
}
/**
- * Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server
+ * Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server.
*/
private CompletableFuture<Channel> createConnection(InetSocketAddress unresolvedAddress) {
int port;
@@ -247,27 +240,32 @@ public class ConnectionPool implements Closeable {
}
/**
- * Try to connect to a sequence of IP addresses until a successfull connection can be made, or fail if no address is
- * working
+ * Try to connect to a sequence of IP addresses until a successful connection can be made, or fail if no
+ * address is working.
*/
- private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses, int port, InetSocketAddress sniHost) {
+ private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses,
+ int port,
+ InetSocketAddress sniHost) {
CompletableFuture<Channel> future = new CompletableFuture<>();
// Successfully connected to server
- connectToAddress(unresolvedAddresses.next(), port, sniHost).thenAccept(future::complete).exceptionally(exception -> {
- if (unresolvedAddresses.hasNext()) {
- // Try next IP address
- connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(future::complete).exceptionally(ex -> {
- // This is already unwinding the recursive call
- future.completeExceptionally(ex);
+ connectToAddress(unresolvedAddresses.next(), port, sniHost)
+ .thenAccept(future::complete)
+ .exceptionally(exception -> {
+ if (unresolvedAddresses.hasNext()) {
+ // Try next IP address
+ connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(future::complete)
+ .exceptionally(ex -> {
+ // This is already unwinding the recursive call
+ future.completeExceptionally(ex);
+ return null;
+ });
+ } else {
+ // Failed to connect to any IP address
+ future.completeExceptionally(exception);
+ }
return null;
});
- } else {
- // Failed to connect to any IP address
- future.completeExceptionally(exception);
- }
- return null;
- });
return future;
}
@@ -285,7 +283,7 @@ public class ConnectionPool implements Closeable {
}
/**
- * Attempt to establish a TCP connection to an already resolved single IP address
+ * Attempt to establish a TCP connection to an already resolved single IP address.
*/
private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port, InetSocketAddress sniHost) {
InetSocketAddress remoteAddress = new InetSocketAddress(ipAddress, port);
@@ -303,7 +301,7 @@ public class ConnectionPool implements Closeable {
if (maxConnectionsPerHosts == 0) {
//Disable pooling
if (cnx.channel().isActive()) {
- if(log.isDebugEnabled()) {
+ if (log.isDebugEnabled()) {
log.debug("close connection due to pooling disabled.");
}
cnx.close();
@@ -312,14 +310,8 @@ public class ConnectionPool implements Closeable {
}
@Override
- public void close() throws IOException {
- try {
- if (!eventLoopGroup.isShutdown()) {
- eventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).await();
- }
- } catch (InterruptedException e) {
- log.warn("EventLoopGroup shutdown was interrupted", e);
- }
+ public void close() throws Exception {
+ closeAllConnections();
dnsResolver.close();
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index f1b7807..ff392ca 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -21,8 +21,9 @@ package org.apache.pulsar.proxy.server;
import static com.google.common.base.Preconditions.checkArgument;
import java.net.SocketAddress;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
@@ -34,11 +35,9 @@ import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarChannelInitializer;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.protocol.Commands;
@@ -68,8 +67,9 @@ import lombok.Getter;
*/
public class ProxyConnection extends PulsarHandler implements FutureListener<Void> {
// ConnectionPool is used by the proxy to issue lookup requests
- private PulsarClientImpl client;
private ConnectionPool connectionPool;
+ private final AtomicLong requestIdGenerator =
+ new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2));
private ProxyService service;
AuthenticationDataSource authenticationData;
private State state;
@@ -113,7 +113,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
}
ConnectionPool getConnectionPool() {
- return client.getCnxPool();
+ return connectionPool;
}
public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> sslHandlerSupplier) {
@@ -130,7 +130,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
if (ProxyService.activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
ctx.close();
ProxyService.rejectedConnections.inc();
- return;
}
}
@@ -149,26 +148,27 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
}
@Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
directProxyHandler.outboundChannel.close();
+ directProxyHandler = null;
}
- if (client != null) {
- client.close();
- }
service.getClientCnxs().remove(this);
LOG.info("[{}] Connection closed", remoteAddress);
if (connectionPool != null) {
try {
connectionPool.close();
+ connectionPool = null;
} catch (Exception e) {
LOG.error("Failed to close connection pool {}", e.getMessage(), e);
}
}
+
+ state = State.Closed;
}
@Override
@@ -222,7 +222,30 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
}
}
- private void completeConnect() {
+ private synchronized void completeConnect(AuthData clientData) throws PulsarClientException {
+ if (service.getConfiguration().isAuthenticationEnabled()) {
+ if (service.getConfiguration().isForwardAuthorizationCredentials()) {
+ this.clientAuthData = clientData;
+ this.clientAuthMethod = authMethod;
+ }
+ if (this.connectionPool == null) {
+ this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
+ () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
+ clientAuthMethod, protocolVersionToAdvertise));
+ } else {
+ LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}",
+ remoteAddress, state, clientAuthRole);
+ }
+ } else {
+ if (this.connectionPool == null) {
+ this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
+ () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise));
+ } else {
+ LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {}",
+ remoteAddress, state);
+ }
+ }
+
LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}",
remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl);
if (hasProxyToBrokerUrl) {
@@ -242,17 +265,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
}
}
- private void createClientAndCompleteConnect(AuthData clientData)
- throws PulsarClientException {
- if (service.getConfiguration().isForwardAuthorizationCredentials()) {
- this.clientAuthData = clientData;
- this.clientAuthMethod = authMethod;
- }
- this.client = createClient(clientConf, this.clientAuthData, this.clientAuthMethod, protocolVersionToAdvertise);
-
- completeConnect();
- }
-
// According to auth result, send newConnected or newAuthChallenge command.
private void doAuthentication(AuthData clientData) throws Exception {
AuthData brokerData = authState.authenticate(clientData);
@@ -263,7 +275,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
LOG.debug("[{}] Client successfully authenticated with {} role {}",
remoteAddress, authMethod, clientAuthRole);
}
- createClientAndCompleteConnect(clientData);
+ completeConnect(clientData);
return;
}
@@ -274,7 +286,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
remoteAddress, authMethod);
}
state = State.Connecting;
- return;
}
@Override
@@ -302,16 +313,10 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
try {
// init authn
this.clientConf = createClientConfiguration();
- int protocolVersion = getProtocolVersionToAdvertise(connect);
// authn not enabled, complete
if (!service.getConfiguration().isAuthenticationEnabled()) {
- this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
- () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion));
- this.client =
- new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer());
-
- completeConnect();
+ completeConnect(null);
return;
}
@@ -336,7 +341,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
.orElseThrow(() ->
new AuthenticationException("No anonymous role, and no authentication provider configured"));
- createClientAndCompleteConnect(clientData);
+ completeConnect(clientData);
return;
}
@@ -354,7 +359,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
LOG.warn("[{}] Unable to authenticate: ", remoteAddress, e);
ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate"));
close();
- return;
}
}
@@ -409,19 +413,26 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
lookupProxyHandler.handleLookup(lookup);
}
- private void close() {
- state = State.Closed;
- ctx.close();
- try {
- if (client != null) {
- client.close();
+ private synchronized void close() {
+ if (state != State.Closed) {
+ state = State.Closed;
+ if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
+ directProxyHandler.outboundChannel.close();
+ directProxyHandler = null;
+ }
+ if (connectionPool != null) {
+ try {
+ connectionPool.close();
+ connectionPool = null;
+ } catch (Exception e) {
+ LOG.error("Error closing connection pool", e);
+ }
}
- } catch (PulsarClientException e) {
- LOG.error("Unable to close pulsar client - {}. Error - {}", client, e.getMessage());
+ ctx.close();
}
}
- ClientConfigurationData createClientConfiguration() throws UnsupportedAuthenticationException {
+ ClientConfigurationData createClientConfiguration() {
ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setServiceUrl(service.getServiceUrl());
ProxyConfiguration proxyConfig = service.getConfiguration();
@@ -441,20 +452,12 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
return clientConf;
}
- private PulsarClientImpl createClient(final ClientConfigurationData clientConf, final AuthData clientAuthData,
- final String clientAuthMethod, final int protocolVersion) throws PulsarClientException {
- this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
- () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
- clientAuthMethod, protocolVersion));
- return new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer());
- }
-
private static int getProtocolVersionToAdvertise(CommandConnect connect) {
return Math.min(connect.getProtocolVersion(), Commands.getCurrentProtocolVersion());
}
long newRequestId() {
- return client.newRequestId();
+ return requestIdGenerator.getAndIncrement();
}
public Authentication getClientAuthentication() {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index abc7a5f..c5e04b6 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -27,8 +27,6 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
-import io.netty.util.HashedWheelTimer;
-import io.netty.util.Timer;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import lombok.Getter;
@@ -75,7 +73,6 @@ public class ProxyService implements Closeable {
private final ProxyConfiguration proxyConfig;
private final Authentication proxyClientAuthentication;
- private final Timer timer;
private String serviceUrl;
private String serviceUrlTls;
private ConfigurationMetadataCacheService configurationCacheService;
@@ -135,7 +132,6 @@ public class ProxyService implements Closeable {
AuthenticationService authenticationService) throws IOException {
checkNotNull(proxyConfig);
this.proxyConfig = proxyConfig;
- this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()), 1, TimeUnit.MILLISECONDS);
this.clientCnxs = Sets.newConcurrentHashSet();
this.topicStats = Maps.newConcurrentMap();
@@ -275,9 +271,6 @@ public class ProxyService implements Closeable {
}
acceptorGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
- if (timer != null) {
- timer.stop();
- }
}
public String getServiceUrl() {
@@ -292,10 +285,6 @@ public class ProxyService implements Closeable {
return proxyConfig;
}
- public Timer getTimer() {
- return timer;
- }
-
public AuthenticationService getAuthenticationService() {
return authenticationService;
}