You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/05/20 15:05:52 UTC
[pulsar] 10/31: [Proxy/Client] Fix DNS server denial-of-service issue when DNS entry expires (#15403)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 352bb76dfabe04f4b75df31e4680e8af47e02e13
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue May 3 05:59:19 2022 +0300
[Proxy/Client] Fix DNS server denial-of-service issue when DNS entry expires (#15403)
(cherry picked from commit 40d71691dab2a09d3457f8fa638b19ebc2e28dd7)
---
.../pulsar/client/impl/ConnectionPoolTest.java | 38 ++++++++--------
.../apache/pulsar/client/impl/ConnectionPool.java | 53 +++++++++++-----------
.../pulsar/proxy/server/ProxyConnection.java | 11 +++--
.../apache/pulsar/proxy/server/ProxyService.java | 12 ++---
.../proxy/server/ServiceChannelInitializer.java | 2 +-
5 files changed, 60 insertions(+), 56 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
index 0193f592c75..9fbea7f2914 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
@@ -22,6 +22,10 @@ import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructor
import com.google.common.collect.Lists;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
@@ -31,22 +35,18 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.IntStream;
-
@Test(groups = "broker-impl")
public class ConnectionPoolTest extends MockedPulsarServiceBaseTest {
String serviceUrl;
+ int brokerPort;
@BeforeClass
@Override
protected void setup() throws Exception {
super.internalSetup();
- serviceUrl = "pulsar://non-existing-dns-name:" + pulsar.getBrokerListenPort().get();
+ brokerPort = pulsar.getBrokerListenPort().get();
+ serviceUrl = "pulsar://non-existing-dns-name:" + brokerPort;
}
@AfterClass(alwaysRun = true)
@@ -63,9 +63,11 @@ public class ConnectionPoolTest extends MockedPulsarServiceBaseTest {
conf.setServiceUrl(serviceUrl);
PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool);
- List<InetAddress> result = Lists.newArrayList();
- result.add(InetAddress.getByName("127.0.0.1"));
- Mockito.when(pool.resolveName("non-existing-dns-name")).thenReturn(CompletableFuture.completedFuture(result));
+ List<InetSocketAddress> result = Lists.newArrayList();
+ result.add(new InetSocketAddress("127.0.0.1", brokerPort));
+ Mockito.when(pool.resolveName(InetSocketAddress.createUnresolved("non-existing-dns-name",
+ brokerPort)))
+ .thenReturn(CompletableFuture.completedFuture(result));
client.newProducer().topic("persistent://sample/standalone/ns/my-topic").create();
@@ -75,20 +77,20 @@ public class ConnectionPoolTest extends MockedPulsarServiceBaseTest {
@Test
public void testDoubleIpAddress() throws Exception {
- String serviceUrl = "pulsar://non-existing-dns-name:" + pulsar.getBrokerListenPort().get();
-
ClientConfigurationData conf = new ClientConfigurationData();
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("test"));
ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop);
conf.setServiceUrl(serviceUrl);
PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool);
- List<InetAddress> result = Lists.newArrayList();
+ List<InetSocketAddress> result = Lists.newArrayList();
// Add a non existent IP to the response to check that we're trying the 2nd address as well
- result.add(InetAddress.getByName("127.0.0.99"));
- result.add(InetAddress.getByName("127.0.0.1"));
- Mockito.when(pool.resolveName("non-existing-dns-name")).thenReturn(CompletableFuture.completedFuture(result));
+ result.add(new InetSocketAddress("127.0.0.99", brokerPort));
+ result.add(new InetSocketAddress("127.0.0.1", brokerPort));
+ Mockito.when(pool.resolveName(InetSocketAddress.createUnresolved("non-existing-dns-name",
+ brokerPort)))
+ .thenReturn(CompletableFuture.completedFuture(result));
// Create producer should succeed by trying the 2nd IP
client.newProducer().topic("persistent://sample/standalone/ns/my-topic").create();
@@ -105,7 +107,7 @@ public class ConnectionPoolTest extends MockedPulsarServiceBaseTest {
ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop);
InetSocketAddress brokerAddress =
- InetSocketAddress.createUnresolved("127.0.0.1", pulsar.getBrokerListenPort().get());
+ InetSocketAddress.createUnresolved("127.0.0.1", brokerPort);
IntStream.range(1, 5).forEach(i -> {
pool.getConnection(brokerAddress).thenAccept(cnx -> {
Assert.assertTrue(cnx.channel().isActive());
@@ -127,7 +129,7 @@ public class ConnectionPoolTest extends MockedPulsarServiceBaseTest {
ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop);
InetSocketAddress brokerAddress =
- InetSocketAddress.createUnresolved("127.0.0.1", pulsar.getBrokerListenPort().get());
+ InetSocketAddress.createUnresolved("127.0.0.1", brokerPort);
IntStream.range(1, 10).forEach(i -> {
pool.getConnection(brokerAddress).thenAccept(cnx -> {
Assert.assertTrue(cnx.channel().isActive());
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index edb2a983f25..1e3331f66c2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -26,10 +26,10 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
-import io.netty.resolver.dns.DnsNameResolver;
+import io.netty.resolver.AddressResolver;
+import io.netty.resolver.dns.DnsAddressResolverGroup;
import io.netty.resolver.dns.DnsNameResolverBuilder;
import io.netty.util.concurrent.Future;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
@@ -63,7 +63,7 @@ public class ConnectionPool implements AutoCloseable {
private final int maxConnectionsPerHosts;
private final boolean isSniProxy;
- protected final DnsNameResolver dnsResolver;
+ protected final AddressResolver<InetSocketAddress> addressResolver;
private final boolean shouldCloseDnsResolver;
public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
@@ -76,7 +76,8 @@ public class ConnectionPool implements AutoCloseable {
}
public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
- Supplier<ClientCnx> clientCnxSupplier, Optional<DnsNameResolver> dnsNameResolver)
+ Supplier<ClientCnx> clientCnxSupplier,
+ Optional<AddressResolver<InetSocketAddress>> addressResolver)
throws PulsarClientException {
this.eventLoopGroup = eventLoopGroup;
this.clientConfig = conf;
@@ -101,12 +102,13 @@ public class ConnectionPool implements AutoCloseable {
throw new PulsarClientException(e);
}
- this.shouldCloseDnsResolver = !dnsNameResolver.isPresent();
- this.dnsResolver = dnsNameResolver.orElseGet(() -> createDnsNameResolver(conf, eventLoopGroup));
+ this.shouldCloseDnsResolver = !addressResolver.isPresent();
+ this.addressResolver = addressResolver.orElseGet(() -> createAddressResolver(conf, eventLoopGroup));
}
- private static DnsNameResolver createDnsNameResolver(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
- DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder(eventLoopGroup.next())
+ private static AddressResolver<InetSocketAddress> createAddressResolver(ClientConfigurationData conf,
+ EventLoopGroup eventLoopGroup) {
+ DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder()
.traceEnabled(true).channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup));
if (conf.getDnsLookupBindAddress() != null) {
InetSocketAddress addr = new InetSocketAddress(conf.getDnsLookupBindAddress(),
@@ -114,7 +116,10 @@ public class ConnectionPool implements AutoCloseable {
dnsNameResolverBuilder.localAddress(addr);
}
DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder);
- return dnsNameResolverBuilder.build();
+ // use DnsAddressResolverGroup to create the AddressResolver since it contains a solution
+ // to prevent cache stampede / thundering herds problem when a DNS entry expires while the system
+ // is under high load
+ return new DnsAddressResolverGroup(dnsNameResolverBuilder).getResolver(eventLoopGroup.next());
}
private static final Random random = new Random();
@@ -239,19 +244,17 @@ public class ConnectionPool implements AutoCloseable {
* Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server.
*/
private CompletableFuture<Channel> createConnection(InetSocketAddress unresolvedAddress) {
- int port;
- CompletableFuture<List<InetAddress>> resolvedAddress;
+ CompletableFuture<List<InetSocketAddress>> resolvedAddress;
try {
if (isSniProxy) {
URI proxyURI = new URI(clientConfig.getProxyServiceUrl());
- port = proxyURI.getPort();
- resolvedAddress = resolveName(proxyURI.getHost());
+ resolvedAddress =
+ resolveName(InetSocketAddress.createUnresolved(proxyURI.getHost(), proxyURI.getPort()));
} else {
- port = unresolvedAddress.getPort();
- resolvedAddress = resolveName(unresolvedAddress.getHostString());
+ resolvedAddress = resolveName(unresolvedAddress);
}
return resolvedAddress.thenCompose(
- inetAddresses -> connectToResolvedAddresses(inetAddresses.iterator(), port,
+ inetAddresses -> connectToResolvedAddresses(inetAddresses.iterator(),
isSniProxy ? unresolvedAddress : null));
} catch (URISyntaxException e) {
log.error("Invalid Proxy url {}", clientConfig.getProxyServiceUrl(), e);
@@ -264,18 +267,17 @@ public class ConnectionPool implements AutoCloseable {
* Try to connect to a sequence of IP addresses until a successful connection can be made, or fail if no
* address is working.
*/
- private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses,
- int port,
+ private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetSocketAddress> unresolvedAddresses,
InetSocketAddress sniHost) {
CompletableFuture<Channel> future = new CompletableFuture<>();
// Successfully connected to server
- connectToAddress(unresolvedAddresses.next(), port, sniHost)
+ connectToAddress(unresolvedAddresses.next(), sniHost)
.thenAccept(future::complete)
.exceptionally(exception -> {
if (unresolvedAddresses.hasNext()) {
// Try next IP address
- connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(future::complete)
+ connectToResolvedAddresses(unresolvedAddresses, sniHost).thenAccept(future::complete)
.exceptionally(ex -> {
// This is already unwinding the recursive call
future.completeExceptionally(ex);
@@ -291,9 +293,9 @@ public class ConnectionPool implements AutoCloseable {
return future;
}
- CompletableFuture<List<InetAddress>> resolveName(String hostname) {
- CompletableFuture<List<InetAddress>> future = new CompletableFuture<>();
- dnsResolver.resolveAll(hostname).addListener((Future<List<InetAddress>> resolveFuture) -> {
+ CompletableFuture<List<InetSocketAddress>> resolveName(InetSocketAddress unresolvedAddress) {
+ CompletableFuture<List<InetSocketAddress>> future = new CompletableFuture<>();
+ addressResolver.resolveAll(unresolvedAddress).addListener((Future<List<InetSocketAddress>> resolveFuture) -> {
if (resolveFuture.isSuccess()) {
future.complete(resolveFuture.get());
} else {
@@ -306,8 +308,7 @@ public class ConnectionPool implements AutoCloseable {
/**
* Attempt to establish a TCP connection to an already resolved single IP address.
*/
- private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port, InetSocketAddress sniHost) {
- InetSocketAddress remoteAddress = new InetSocketAddress(ipAddress, port);
+ private CompletableFuture<Channel> connectToAddress(InetSocketAddress remoteAddress, InetSocketAddress sniHost) {
if (clientConfig.isUseTls()) {
return toCompletableFuture(bootstrap.register())
.thenCompose(channel -> channelInitializerHandler
@@ -337,7 +338,7 @@ public class ConnectionPool implements AutoCloseable {
public void close() throws Exception {
closeAllConnections();
if (shouldCloseDnsResolver) {
- dnsResolver.close();
+ addressResolver.close();
}
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 33dc1abd88a..0d80c2e473a 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -26,7 +26,7 @@ import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.handler.ssl.SslHandler;
-import io.netty.resolver.dns.DnsNameResolver;
+import io.netty.resolver.dns.DnsAddressResolverGroup;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collections;
@@ -77,7 +77,7 @@ public class ProxyConnection extends PulsarHandler {
private final AtomicLong requestIdGenerator =
new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2));
private final ProxyService service;
- private final DnsNameResolver dnsNameResolver;
+ private final DnsAddressResolverGroup dnsAddressResolverGroup;
AuthenticationDataSource authenticationData;
private State state;
private final Supplier<SslHandler> sslHandlerSupplier;
@@ -130,10 +130,10 @@ public class ProxyConnection extends PulsarHandler {
}
public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> sslHandlerSupplier,
- DnsNameResolver dnsNameResolver) {
+ DnsAddressResolverGroup dnsAddressResolverGroup) {
super(30, TimeUnit.SECONDS);
this.service = proxyService;
- this.dnsNameResolver = dnsNameResolver;
+ this.dnsAddressResolverGroup = dnsAddressResolverGroup;
this.state = State.Init;
this.sslHandlerSupplier = sslHandlerSupplier;
this.brokerProxyValidator = service.getBrokerProxyValidator();
@@ -276,7 +276,8 @@ public class ProxyConnection extends PulsarHandler {
if (this.connectionPool == null) {
this.connectionPool = new ConnectionPool(clientConf, service.getWorkerGroup(),
- clientCnxSupplier, Optional.of(dnsNameResolver));
+ clientCnxSupplier,
+ Optional.of(dnsAddressResolverGroup.getResolver(service.getWorkerGroup().next())));
} else {
LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}",
remoteAddress, state, clientAuthRole);
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 10b99aeaff1..1960b5143a0 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -28,7 +28,7 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
-import io.netty.resolver.dns.DnsNameResolver;
+import io.netty.resolver.dns.DnsAddressResolverGroup;
import io.netty.resolver.dns.DnsNameResolverBuilder;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.prometheus.client.Counter;
@@ -80,7 +80,7 @@ public class ProxyService implements Closeable {
private final ProxyConfiguration proxyConfig;
private final Authentication proxyClientAuthentication;
@Getter
- private final DnsNameResolver dnsNameResolver;
+ private final DnsAddressResolverGroup dnsAddressResolverGroup;
@Getter
private final BrokerProxyValidator brokerProxyValidator;
private String serviceUrl;
@@ -162,13 +162,13 @@ public class ProxyService implements Closeable {
false, workersThreadFactory);
this.authenticationService = authenticationService;
- DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder(workerGroup.next())
+ DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder()
.channelType(EventLoopUtil.getDatagramChannelClass(workerGroup));
DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder);
- dnsNameResolver = dnsNameResolverBuilder.build();
+ dnsAddressResolverGroup = new DnsAddressResolverGroup(dnsNameResolverBuilder);
- brokerProxyValidator = new BrokerProxyValidator(dnsNameResolver.asAddressResolver(),
+ brokerProxyValidator = new BrokerProxyValidator(dnsAddressResolverGroup.getResolver(workerGroup.next()),
proxyConfig.getBrokerProxyAllowedHostNames(),
proxyConfig.getBrokerProxyAllowedIPAddresses(),
proxyConfig.getBrokerProxyAllowedTargetPorts());
@@ -331,7 +331,7 @@ public class ProxyService implements Closeable {
}
public void close() throws IOException {
- dnsNameResolver.close();
+ dnsAddressResolverGroup.close();
if (discoveryProvider != null) {
discoveryProvider.close();
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index 1a588b481fc..f1fd98bd8f6 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -173,7 +173,7 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
}
ch.pipeline().addLast("handler",
- new ProxyConnection(proxyService, sslHandlerSupplier, proxyService.getDnsNameResolver()));
+ new ProxyConnection(proxyService, sslHandlerSupplier, proxyService.getDnsAddressResolverGroup()));
}
}