You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/02/09 18:30:23 UTC
[pulsar] 02/02: [Proxy] Fix port exhaustion and connection issues in Pulsar Proxy (#14078)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b3bac91a74ad0e9358c0d5c12f87b89166276c67
Author: Lari Hotari <lh...@apache.org>
AuthorDate: Wed Feb 9 20:24:37 2022 +0200
[Proxy] Fix port exhaustion and connection issues in Pulsar Proxy (#14078)
(cherry picked from commit 640b4e6ec14d9b812da608037c58b664e6778637)
(cherry picked from commit 3d2e6ce84b6e69667a1c2095b766d9941a258b61)
---
distribution/server/src/assemble/LICENSE.bin.txt | 2 +
pom.xml | 1 +
.../ProxySaslAuthenticationTest.java | 1 +
.../pulsar/common/protocol/PulsarHandler.java | 2 +-
pulsar-proxy/pom.xml | 13 +-
.../proxy/server/BrokerDiscoveryProvider.java | 10 +
.../pulsar/proxy/server/BrokerProxyValidator.java | 204 +++++++++++++++++++++
.../pulsar/proxy/server/DirectProxyHandler.java | 148 ++++++++-------
.../pulsar/proxy/server/ProxyConfiguration.java | 39 ++++
.../pulsar/proxy/server/ProxyConnection.java | 101 ++++++++--
.../apache/pulsar/proxy/server/ProxyService.java | 17 ++
.../pulsar/proxy/server/ProxyServiceStarter.java | 3 +
.../proxy/server/ServiceChannelInitializer.java | 8 +
.../proxy/server/TargetAddressDeniedException.java | 26 +++
.../proxy/server/AuthedAdminProxyHandlerTest.java | 1 +
.../proxy/server/BrokerProxyValidatorTest.java | 102 +++++++++++
.../proxy/server/ProxyAdditionalServletTest.java | 1 +
.../ProxyAuthenticatedProducerConsumerTest.java | 1 +
.../proxy/server/ProxyAuthenticationTest.java | 1 +
.../pulsar/proxy/server/ProxyConnectionTest.java | 38 ++++
.../server/ProxyConnectionThrottlingTest.java | 1 +
.../server/ProxyEnableHAProxyProtocolTest.java | 1 +
.../proxy/server/ProxyForwardAuthDataTest.java | 1 +
.../proxy/server/ProxyKeyStoreTlsTestWithAuth.java | 1 +
.../server/ProxyKeyStoreTlsTestWithoutAuth.java | 1 +
.../proxy/server/ProxyLookupThrottlingTest.java | 1 +
.../pulsar/proxy/server/ProxyParserTest.java | 1 +
.../proxy/server/ProxyRolesEnforcementTest.java | 1 +
.../proxy/server/ProxyServiceStarterTest.java | 14 +-
.../proxy/server/ProxyServiceTlsStarterTest.java | 15 +-
.../apache/pulsar/proxy/server/ProxyStatsTest.java | 1 +
.../org/apache/pulsar/proxy/server/ProxyTest.java | 1 +
.../apache/pulsar/proxy/server/ProxyTlsTest.java | 1 +
.../pulsar/proxy/server/ProxyTlsTestWithAuth.java | 1 +
.../server/ProxyWithAuthorizationNegTest.java | 1 +
.../proxy/server/ProxyWithAuthorizationTest.java | 2 +
.../server/ProxyWithJwtAuthorizationTest.java | 1 +
.../server/ProxyWithoutServiceDiscoveryTest.java | 1 +
.../SuperUserAuthedAdminProxyHandlerTest.java | 1 +
.../server/UnauthedAdminProxyHandlerTest.java | 1 +
40 files changed, 682 insertions(+), 85 deletions(-)
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 9356988..c0b9856 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -512,6 +512,8 @@ The Apache Software License, Version 2.0
- io.vertx-vertx-web-3.5.3.jar
* Apache ZooKeeper
- org.apache.zookeeper-zookeeper-jute-3.5.9.jar
+ * IPAddress
+ - com.github.seancfoley-ipaddress-5.3.3.jar
BSD 3-clause "New" or "Revised" License
* Google auth library
diff --git a/pom.xml b/pom.xml
index aee8f00..f8a7689 100644
--- a/pom.xml
+++ b/pom.xml
@@ -181,6 +181,7 @@ flexible messaging model and an intuitive client API.</description>
<kubernetesclient.version>9.0.2</kubernetesclient.version>
<nsq-client.version>1.0</nsq-client.version>
<docker-java.version>3.2.7</docker-java.version>
+ <seancfoley.ipaddress.version>5.3.3</seancfoley.ipaddress.version>
<!-- test dependencies -->
<cassandra.version>3.6.0</cassandra.version>
diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
index ae8b925..f27503a 100644
--- a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
+++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
@@ -223,6 +223,7 @@ public class ProxySaslAuthenticationTest extends ProducerConsumerBase {
ProxyConfiguration proxyConfig = new ProxyConfiguration();
proxyConfig.setAuthenticationEnabled(true);
proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
proxyConfig.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
index 7a8d5c0..ea16251 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
@@ -112,7 +112,7 @@ public abstract class PulsarHandler extends PulsarDecoder {
}
}
- protected void cancelKeepAliveTask() {
+ public void cancelKeepAliveTask() {
if (keepAliveTask != null) {
keepAliveTask.cancel(false);
keepAliveTask = null;
diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml
index e1bfdf5..f3ab04d 100644
--- a/pulsar-proxy/pom.xml
+++ b/pulsar-proxy/pom.xml
@@ -19,7 +19,7 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.pulsar</groupId>
@@ -177,5 +177,16 @@
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.github.seancfoley</groupId>
+ <artifactId>ipaddress</artifactId>
+ <version>${seancfoley.ipaddress.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
index 57a7a02..3933880 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader;
import org.apache.pulsar.zookeeper.GlobalZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
@@ -83,6 +84,15 @@ public class BrokerDiscoveryProvider implements Closeable {
}
/**
+ * Access the list of available brokers.
+ * @return the list of available brokers
+ * @throws PulsarServerException
+ */
+ public List<? extends ServiceLookupData> getAvailableBrokers() throws PulsarServerException {
+ return localZkCache.getAvailableBrokers();
+ }
+
+ /**
* Find next broker {@link LoadManagerReport} in round-robin fashion.
*
* @return
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java
new file mode 100644
index 0000000..3c49653
--- /dev/null
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.proxy.server;
+
+import inet.ipaddr.IPAddress;
+import inet.ipaddr.IPAddressString;
+import inet.ipaddr.ipv4.IPv4Address;
+import inet.ipaddr.ipv6.IPv6Address;
+import io.netty.resolver.AddressResolver;
+import io.netty.util.concurrent.Future;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.StringTokenizer;
+import java.util.concurrent.CompletableFuture;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class BrokerProxyValidator {
+ private static final String SEPARATOR = "\\s*,\\s*";
+ private static final String ALLOW_ANY = "*";
+ private final int[] allowedTargetPorts;
+ private final boolean allowAnyTargetPort;
+ private final List<IPAddress> allowedIPAddresses;
+ private final boolean allowAnyIPAddress;
+ private final AddressResolver<InetSocketAddress> inetSocketAddressResolver;
+ private final List<Pattern> allowedHostNames;
+ private final boolean allowAnyHostName;
+
+ public BrokerProxyValidator(AddressResolver<InetSocketAddress> inetSocketAddressResolver, String allowedHostNames,
+ String allowedIPAddresses, String allowedTargetPorts) {
+ this.inetSocketAddressResolver = inetSocketAddressResolver;
+ List<String> allowedHostNamesStrings = parseCommaSeparatedConfigValue(allowedHostNames);
+ if (allowedHostNamesStrings.contains(ALLOW_ANY)) {
+ this.allowAnyHostName = true;
+ this.allowedHostNames = Collections.emptyList();
+ } else {
+ this.allowAnyHostName = false;
+ this.allowedHostNames = allowedHostNamesStrings.stream()
+ .map(BrokerProxyValidator::parseWildcardPattern).collect(Collectors.toList());
+ }
+ List<String> allowedIPAddressesStrings = parseCommaSeparatedConfigValue(allowedIPAddresses);
+ if (allowedIPAddressesStrings.contains(ALLOW_ANY)) {
+ allowAnyIPAddress = true;
+ this.allowedIPAddresses = Collections.emptyList();
+ } else {
+ allowAnyIPAddress = false;
+ this.allowedIPAddresses = allowedIPAddressesStrings.stream().map(IPAddressString::new)
+ .filter(ipAddressString -> {
+ if (ipAddressString.isValid()) {
+ return true;
+ } else {
+ throw new IllegalArgumentException("Invalid IP address filter '" + ipAddressString + "'",
+ ipAddressString.getAddressStringException());
+ }
+ }).map(IPAddressString::getAddress)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ }
+ List<String> allowedTargetPortsStrings = parseCommaSeparatedConfigValue(allowedTargetPorts);
+ if (allowedTargetPortsStrings.contains(ALLOW_ANY)) {
+ allowAnyTargetPort = true;
+ this.allowedTargetPorts = new int[0];
+ } else {
+ allowAnyTargetPort = false;
+ this.allowedTargetPorts =
+ allowedTargetPortsStrings.stream().mapToInt(Integer::parseInt).toArray();
+ }
+ }
+
+ private static Pattern parseWildcardPattern(String wildcardPattern) {
+ String regexPattern =
+ Collections.list(new StringTokenizer(wildcardPattern, "*", true))
+ .stream()
+ .map(String::valueOf)
+ .map(token -> {
+ if ("*".equals(token)) {
+ return ".*";
+ } else {
+ return Pattern.quote(token);
+ }
+ }).collect(Collectors.joining());
+ return Pattern.compile(
+ "^" + regexPattern + "$",
+ Pattern.CASE_INSENSITIVE);
+ }
+
+ private static List<String> parseCommaSeparatedConfigValue(String configValue) {
+ return Arrays.stream(configValue.split(SEPARATOR)).map(String::trim).filter(s -> s.length() > 0)
+ .collect(Collectors.toList());
+ }
+
+ public CompletableFuture<InetSocketAddress> resolveAndCheckTargetAddress(String hostAndPort) {
+ int pos = hostAndPort.indexOf(':');
+ String host = hostAndPort.substring(0, pos);
+ int port = Integer.parseInt(hostAndPort.substring(pos + 1));
+ if (!isPortAllowed(port)) {
+ return FutureUtil.failedFuture(
+ new TargetAddressDeniedException("Given port in '" + hostAndPort + "' isn't allowed."));
+ } else if (!isHostAllowed(host)) {
+ return FutureUtil.failedFuture(
+ new TargetAddressDeniedException("Given host in '" + hostAndPort + "' isn't allowed."));
+ } else {
+ return toCompletableFuture(
+ inetSocketAddressResolver.resolve(InetSocketAddress.createUnresolved(host, port)))
+ .thenCompose(resolvedAddress -> {
+ CompletableFuture<InetSocketAddress> result = new CompletableFuture();
+ if (isIPAddressAllowed(resolvedAddress)) {
+ result.complete(resolvedAddress);
+ } else {
+ result.completeExceptionally(new TargetAddressDeniedException(
+ "The IP address of the given host and port '" + hostAndPort + "' isn't allowed."));
+ }
+ return result;
+ });
+ }
+ }
+
+ // backported from NettyFutureUtil.toCompletableFuture
+ private static <V> CompletableFuture<V> toCompletableFuture(Future<V> future) {
+ Objects.requireNonNull(future, "future cannot be null");
+
+ CompletableFuture<V> adapter = new CompletableFuture<>();
+ if (future.isDone()) {
+ if (future.isSuccess()) {
+ adapter.complete(future.getNow());
+ } else {
+ adapter.completeExceptionally(future.cause());
+ }
+ } else {
+ future.addListener((Future<V> f) -> {
+ if (f.isSuccess()) {
+ adapter.complete(f.getNow());
+ } else {
+ adapter.completeExceptionally(f.cause());
+ }
+ });
+ }
+ return adapter;
+ }
+
+ private boolean isPortAllowed(int port) {
+ if (allowAnyTargetPort) {
+ return true;
+ }
+ for (int allowedPort : allowedTargetPorts) {
+ if (allowedPort == port) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isIPAddressAllowed(InetSocketAddress resolvedAddress) {
+ if (allowAnyIPAddress) {
+ return true;
+ }
+ byte[] addressBytes = resolvedAddress.getAddress().getAddress();
+ IPAddress candidateAddress =
+ addressBytes.length == 4 ? new IPv4Address(addressBytes) : new IPv6Address(addressBytes);
+ for (IPAddress allowedAddress : allowedIPAddresses) {
+ if (allowedAddress.contains(candidateAddress)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isHostAllowed(String host) {
+ if (allowAnyHostName) {
+ return true;
+ }
+ boolean matched = false;
+ for (Pattern allowedHostName : allowedHostNames) {
+ if (allowedHostName.matcher(host).matches()) {
+ matched = true;
+ break;
+ }
+ }
+ return matched;
+ }
+}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 3e3d668..f761517 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -39,6 +39,7 @@ import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@@ -49,6 +50,7 @@ import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.net.ssl.SSLSession;
@@ -73,25 +75,25 @@ import org.slf4j.LoggerFactory;
public class DirectProxyHandler {
@Getter
- private Channel inboundChannel;
+ private final Channel inboundChannel;
@Getter
Channel outboundChannel;
@Getter
private final Rate inboundChannelRequestsRate;
protected static Map<ChannelId, ChannelId> inboundOutboundChannelMap = new ConcurrentHashMap<>();
- private String originalPrincipal;
- private AuthData clientAuthData;
- private String clientAuthMethod;
- private int protocolVersion;
+ private final String originalPrincipal;
+ private final AuthData clientAuthData;
+ private final String clientAuthMethod;
public static final String TLS_HANDLER = "tls";
private final Authentication authentication;
- private final Supplier<SslHandler> sslHandlerSupplier;
private AuthenticationDataProvider authenticationDataProvider;
- private ProxyService service;
+ private final ProxyService service;
+ private final Runnable onHandshakeCompleteAction;
public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl,
- int protocolVersion, Supplier<SslHandler> sslHandlerSupplier) {
+ InetSocketAddress targetBrokerAddress, int protocolVersion,
+ Supplier<SslHandler> sslHandlerSupplier) {
this.service = service;
this.authentication = proxyConnection.getClientAuthentication();
this.inboundChannel = proxyConnection.ctx().channel();
@@ -99,8 +101,7 @@ public class DirectProxyHandler {
this.originalPrincipal = proxyConnection.clientAuthRole;
this.clientAuthData = proxyConnection.clientAuthData;
this.clientAuthMethod = proxyConnection.clientAuthMethod;
- this.protocolVersion = protocolVersion;
- this.sslHandlerSupplier = sslHandlerSupplier;
+ this.onHandshakeCompleteAction = proxyConnection::cancelKeepAliveTask;
ProxyConfiguration config = service.getConfiguration();
// Start the connection attempt.
@@ -109,13 +110,22 @@ public class DirectProxyHandler {
// switches when passing data between the 2
// connections
b.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
+ int brokerProxyConnectTimeoutMs = service.getConfiguration().getBrokerProxyConnectTimeoutMs();
+ if (brokerProxyConnectTimeoutMs > 0) {
+ b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, brokerProxyConnectTimeoutMs);
+ }
b.group(inboundChannel.eventLoop()).channel(inboundChannel.getClass()).option(ChannelOption.AUTO_READ, false);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
- protected void initChannel(SocketChannel ch) throws Exception {
+ protected void initChannel(SocketChannel ch) {
if (sslHandlerSupplier != null) {
ch.pipeline().addLast(TLS_HANDLER, sslHandlerSupplier.get());
}
+ int brokerProxyReadTimeoutMs = service.getConfiguration().getBrokerProxyReadTimeoutMs();
+ if (brokerProxyReadTimeoutMs > 0) {
+ ch.pipeline().addLast("readTimeoutHandler",
+ new ReadTimeoutHandler(brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS));
+ }
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
ch.pipeline().addLast("proxyOutboundHandler", new ProxyBackendHandler(config, protocolVersion));
@@ -133,7 +143,7 @@ public class DirectProxyHandler {
return;
}
- ChannelFuture f = b.connect(targetBroker.getHost(), targetBroker.getPort());
+ ChannelFuture f = b.connect(targetBrokerAddress);
outboundChannel = f.channel();
f.addListener(future -> {
if (!future.isSuccess()) {
@@ -211,8 +221,8 @@ public class DirectProxyHandler {
private BackendState state = BackendState.Init;
private String remoteHostName;
protected ChannelHandlerContext ctx;
- private ProxyConfiguration config;
- private int protocolVersion;
+ private final ProxyConfiguration config;
+ private final int protocolVersion;
public ProxyBackendHandler(ProxyConfiguration config, int protocolVersion) {
this.config = config;
@@ -225,7 +235,7 @@ public class DirectProxyHandler {
// Send the Connect command to broker
authenticationDataProvider = authentication.getAuthData(remoteHostName);
AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
- ByteBuf command = null;
+ ByteBuf command;
command = Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion, "Pulsar proxy",
null /* target broker */, originalPrincipal, clientAuthData, clientAuthMethod);
outboundChannel.writeAndFlush(command);
@@ -293,12 +303,11 @@ public class DirectProxyHandler {
outboundChannel.read();
} catch (Exception e) {
log.error("Error mutual verify", e);
- return;
}
}
@Override
- public void operationComplete(Future<Void> future) throws Exception {
+ public void operationComplete(Future<Void> future) {
// This is invoked when the write operation on the paired connection
// is completed
if (future.isSuccess()) {
@@ -317,6 +326,7 @@ public class DirectProxyHandler {
@Override
protected void handleConnected(CommandConnected connected) {
+ checkArgument(state == BackendState.Init, "Unexpected state %s. BackendState.Init was expected.", state);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Received Connected from broker", inboundChannel, outboundChannel);
}
@@ -332,58 +342,68 @@ public class DirectProxyHandler {
state = BackendState.HandshakeCompleted;
- ChannelFuture channelFuture;
- if (connected.hasMaxMessageSize()) {
- channelFuture = inboundChannel.writeAndFlush(
- Commands.newConnected(connected.getProtocolVersion(), connected.getMaxMessageSize()));
- } else {
- channelFuture = inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion()));
- }
+ onHandshakeCompleteAction.run();
+ startDirectProxying(connected);
+
+ int maxMessageSize =
+ connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
+ inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize))
+ .addListener(future -> {
+ if (future.isSuccess()) {
+ // Start reading from both connections
+ inboundChannel.read();
+ outboundChannel.read();
+ } else {
+ log.warn("[{}] [{}] Failed to write to inbound connection. Closing both connections.",
+ inboundChannel,
+ outboundChannel, future.cause());
+ inboundChannel.close();
+ }
+ });
+ }
- channelFuture.addListener(future -> {
+ private void startDirectProxying(CommandConnected connected) {
+ if (service.getProxyLogLevel() == 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Removing decoder from pipeline", inboundChannel, outboundChannel);
}
- if (service.getProxyLogLevel() == 0) {
- // direct tcp proxy
- inboundChannel.pipeline().remove("frameDecoder");
- outboundChannel.pipeline().remove("frameDecoder");
+ // direct tcp proxy
+ inboundChannel.pipeline().remove("frameDecoder");
+ outboundChannel.pipeline().remove("frameDecoder");
+ } else {
+ // Enable parsing feature, proxyLogLevel(1 or 2)
+ // Add parser handler
+ if (connected.hasMaxMessageSize()) {
+ inboundChannel.pipeline()
+ .replace("frameDecoder", "newFrameDecoder",
+ new LengthFieldBasedFrameDecoder(connected.getMaxMessageSize()
+ + Commands.MESSAGE_SIZE_FRAME_PADDING,
+ 0, 4, 0, 4));
+ outboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder",
+ new LengthFieldBasedFrameDecoder(
+ connected.getMaxMessageSize()
+ + Commands.MESSAGE_SIZE_FRAME_PADDING,
+ 0, 4, 0, 4));
+
+ inboundChannel.pipeline().addBefore("handler", "inboundParser",
+ new ParserProxyHandler(service, inboundChannel,
+ ParserProxyHandler.FRONTEND_CONN,
+ connected.getMaxMessageSize()));
+ outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
+ new ParserProxyHandler(service, outboundChannel,
+ ParserProxyHandler.BACKEND_CONN,
+ connected.getMaxMessageSize()));
} else {
- // Enable parsing feature, proxyLogLevel(1 or 2)
- // Add parser handler
- if (connected.hasMaxMessageSize()) {
- inboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder",
- new LengthFieldBasedFrameDecoder(connected.getMaxMessageSize()
- + Commands.MESSAGE_SIZE_FRAME_PADDING,
- 0, 4, 0, 4));
- outboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder",
- new LengthFieldBasedFrameDecoder(
- connected.getMaxMessageSize()
- + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
-
- inboundChannel.pipeline().addBefore("handler", "inboundParser",
- new ParserProxyHandler(service, inboundChannel,
- ParserProxyHandler.FRONTEND_CONN,
- connected.getMaxMessageSize()));
- outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
- new ParserProxyHandler(service, outboundChannel,
- ParserProxyHandler.BACKEND_CONN,
- connected.getMaxMessageSize()));
- } else {
- inboundChannel.pipeline().addBefore("handler", "inboundParser",
- new ParserProxyHandler(service, inboundChannel,
- ParserProxyHandler.FRONTEND_CONN,
- Commands.DEFAULT_MAX_MESSAGE_SIZE));
- outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
- new ParserProxyHandler(service, outboundChannel,
- ParserProxyHandler.BACKEND_CONN,
- Commands.DEFAULT_MAX_MESSAGE_SIZE));
- }
+ inboundChannel.pipeline().addBefore("handler", "inboundParser",
+ new ParserProxyHandler(service, inboundChannel,
+ ParserProxyHandler.FRONTEND_CONN,
+ Commands.DEFAULT_MAX_MESSAGE_SIZE));
+ outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
+ new ParserProxyHandler(service, outboundChannel,
+ ParserProxyHandler.BACKEND_CONN,
+ Commands.DEFAULT_MAX_MESSAGE_SIZE));
}
- // Start reading from both connections
- inboundChannel.read();
- outboundChannel.read();
- });
+ }
}
@Override
@@ -404,7 +424,7 @@ public class DirectProxyHandler {
private boolean verifyTlsHostName(String hostname, ChannelHandlerContext ctx) {
ChannelHandler sslHandler = ctx.channel().pipeline().get("tls");
- SSLSession sslSession = null;
+ SSLSession sslSession;
if (sslHandler != null) {
sslSession = ((SslHandler) sslHandler).engine().getSession();
return (new TlsHostnameVerifier()).verify(hostname, sslSession);
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 3847f82..a5c44ee 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -49,6 +49,8 @@ public class ProxyConfiguration implements PulsarConfiguration {
@Category
private static final String CATEGORY_BROKER_DISCOVERY = "Broker Discovery";
@Category
+ private static final String CATEGORY_BROKER_PROXY = "Broker Proxy";
+ @Category
private static final String CATEGORY_AUTHENTICATION = "Proxy Authentication";
@Category
private static final String CATEGORY_AUTHORIZATION = "Proxy Authorization";
@@ -136,6 +138,43 @@ public class ProxyConfiguration implements PulsarConfiguration {
)
private String functionWorkerWebServiceURLTLS;
+ @FieldContext(category = CATEGORY_BROKER_PROXY,
+ doc = "When enabled, checks that the target broker is active before connecting. "
+ + "zookeeperServers and configurationStoreServers must be configured in proxy configuration "
+ + "for retrieving the active brokers.")
+ private boolean checkActiveBrokers = false;
+
+ @FieldContext(
+ category = CATEGORY_BROKER_PROXY,
+ doc = "Broker proxy connect timeout.\n"
+ + "The timeout value for Broker proxy connect timeout is in millisecond. Set to 0 to disable."
+ )
+ private int brokerProxyConnectTimeoutMs = 10000;
+
+ @FieldContext(
+ category = CATEGORY_BROKER_PROXY,
+ doc = "Broker proxy read timeout.\n"
+ + "The timeout value for Broker proxy read timeout is in millisecond. Set to 0 to disable."
+ )
+ private int brokerProxyReadTimeoutMs = 75000;
+
+ @FieldContext(
+ category = CATEGORY_BROKER_PROXY,
+ doc = "Allowed broker target host names. "
+ + "Supports multiple comma separated entries and a wildcard.")
+ private String brokerProxyAllowedHostNames = "*";
+
+ @FieldContext(
+ category = CATEGORY_BROKER_PROXY,
+ doc = "Allowed broker target ip addresses or ip networks / netmasks. "
+ + "Supports multiple comma separated entries.")
+ private String brokerProxyAllowedIPAddresses = "*";
+
+ @FieldContext(
+ category = CATEGORY_BROKER_PROXY,
+ doc = "Allowed broker target ports")
+ private String brokerProxyAllowedTargetPorts = "6650,6651";
+
@FieldContext(
category = CATEGORY_SERVER,
doc = "Hostname or IP address the service binds on"
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index c559120..f1e45f0 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -20,7 +20,10 @@ package org.apache.pulsar.proxy.server;
import static com.google.common.base.Preconditions.checkArgument;
+import io.netty.handler.codec.haproxy.HAProxyMessage;
import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -28,7 +31,7 @@ import java.util.function.Supplier;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
-import io.netty.handler.codec.haproxy.HAProxyMessage;
+import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
@@ -50,6 +53,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,11 +70,12 @@ import lombok.Getter;
*
*/
public class ProxyConnection extends PulsarHandler implements FutureListener<Void> {
+ private static final Logger LOG = LoggerFactory.getLogger(ProxyConnection.class);
// ConnectionPool is used by the proxy to issue lookup requests
private ConnectionPool connectionPool;
private final AtomicLong requestIdGenerator =
new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2));
- private ProxyService service;
+ private final ProxyService service;
private Authentication clientAuthentication;
AuthenticationDataSource authenticationData;
private State state;
@@ -79,6 +84,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
private LookupProxyHandler lookupProxyHandler = null;
@Getter
private DirectProxyHandler directProxyHandler = null;
+ private final BrokerProxyValidator brokerProxyValidator;
String clientAuthRole;
AuthData clientAuthData;
String clientAuthMethod;
@@ -108,6 +114,8 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
// looking into it
ProxyConnectionToBroker,
+ Closing,
+
Closed,
}
@@ -120,6 +128,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
this.service = proxyService;
this.state = State.Init;
this.sslHandlerSupplier = sslHandlerSupplier;
+ this.brokerProxyValidator = service.getBrokerProxyValidator();
}
@Override
@@ -127,6 +136,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
super.channelRegistered(ctx);
ProxyService.activeConnections.inc();
if (ProxyService.activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
+ state = State.Closing;
ctx.close();
ProxyService.rejectedConnections.inc();
}
@@ -172,6 +182,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ state = State.Closing;
super.exceptionCaught(ctx, cause);
LOG.warn("[{}] Got exception {} : {} {}", remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(),
ClientCnx.isKnownException(cause) ? null : cause);
@@ -210,7 +221,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
}
@Override
- public void operationComplete(Future<Void> future) throws Exception {
+ public void operationComplete(Future<Void> future) {
// This is invoked when the write operation on the paired connection is
// completed
if (future.isSuccess()) {
@@ -246,14 +257,51 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
}
LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}",
- remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl);
+ remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl);
if (hasProxyToBrokerUrl) {
- // Client already knows which broker to connect. Let's open a
- // connection there and just pass bytes in both directions
- state = State.ProxyConnectionToBroker;
- directProxyHandler = new DirectProxyHandler(service, this, proxyToBrokerUrl,
- protocolVersionToAdvertise, sslHandlerSupplier);
- cancelKeepAliveTask();
+ // Optimize proxy connection to fail-fast if the target broker isn't active
+ // Pulsar client will retry connecting after a back off timeout
+ if (service.getConfiguration().isCheckActiveBrokers()
+ && !isBrokerActive(proxyToBrokerUrl)) {
+ state = State.Closing;
+ LOG.warn("[{}] Target broker '{}' isn't available. authenticated with {} role {}.",
+ remoteAddress, proxyToBrokerUrl, authMethod, clientAuthRole);
+ ctx()
+ .writeAndFlush(
+ Commands.newError(-1, ServerError.ServiceNotReady, "Target broker isn't available."))
+ .addListener(future -> ctx().close());
+ return;
+ }
+
+ brokerProxyValidator.resolveAndCheckTargetAddress(proxyToBrokerUrl)
+ .thenAccept(address -> ctx().executor().submit(() -> {
+ // Client already knows which broker to connect. Let's open a
+ // connection there and just pass bytes in both directions
+ state = State.ProxyConnectionToBroker;
+ directProxyHandler = new DirectProxyHandler(service, this, proxyToBrokerUrl, address,
+ protocolVersionToAdvertise, sslHandlerSupplier);
+ }))
+ .exceptionally(throwable -> {
+ if (throwable instanceof TargetAddressDeniedException
+ || throwable.getCause() instanceof TargetAddressDeniedException) {
+ TargetAddressDeniedException targetAddressDeniedException =
+ (TargetAddressDeniedException) (throwable instanceof TargetAddressDeniedException
+ ? throwable : throwable.getCause());
+
+ LOG.warn("[{}] Target broker '{}' cannot be validated. {}. authenticated with {} role {}.",
+ remoteAddress, proxyToBrokerUrl, targetAddressDeniedException.getMessage(),
+ authMethod, clientAuthRole);
+ } else {
+ LOG.error("[{}] Error validating target broker '{}'. authenticated with {} role {}.",
+ remoteAddress, proxyToBrokerUrl, authMethod, clientAuthRole, throwable);
+ }
+ ctx()
+ .writeAndFlush(
+ Commands.newError(-1, ServerError.ServiceNotReady,
+ "Target broker cannot be validated."))
+ .addListener(future -> ctx().close());
+ return null;
+ });
} else {
// Client is doing a lookup, we can consider the handshake complete
// and we'll take care of just topics and
@@ -305,6 +353,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
if (remoteEndpointProtocolVersion < PulsarApi.ProtocolVersion.v10_VALUE) {
LOG.warn("[{}] Client doesn't support connecting through proxy", remoteAddress);
+ state = State.Closing;
ctx.close();
return;
}
@@ -489,6 +538,36 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
return haProxyMessage;
}
- private static final Logger LOG = LoggerFactory.getLogger(ProxyConnection.class);
+ private boolean isBrokerActive(String targetBrokerHostPort) {
+ for (ServiceLookupData serviceLookupData : getAvailableBrokers()) {
+ if (matchesHostAndPort("pulsar://", serviceLookupData.getPulsarServiceUrl(), targetBrokerHostPort)
+ || matchesHostAndPort("pulsar+ssl://", serviceLookupData.getPulsarServiceUrlTls(),
+ targetBrokerHostPort)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private List<? extends ServiceLookupData> getAvailableBrokers() {
+ if (service.getDiscoveryProvider() == null) {
+ LOG.warn("Unable to retrieve active brokers. service.getDiscoveryProvider() is null."
+ + "zookeeperServers and configurationStoreServers must be configured in proxy configuration "
+ + "when checkActiveBrokers is enabled.");
+ return Collections.emptyList();
+ }
+ try {
+ return service.getDiscoveryProvider().getAvailableBrokers();
+ } catch (PulsarServerException e) {
+ LOG.error("Unable to get available brokers", e);
+ return Collections.emptyList();
+ }
+ }
+ static boolean matchesHostAndPort(String expectedPrefix, String pulsarServiceUrl, String brokerHostPort) {
+ return pulsarServiceUrl != null
+ && pulsarServiceUrl.length() == expectedPrefix.length() + brokerHostPort.length()
+ && pulsarServiceUrl.startsWith(expectedPrefix)
+ && pulsarServiceUrl.startsWith(brokerHostPort, expectedPrefix.length());
+ }
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 58ba3cf..c934883 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -26,6 +26,8 @@ import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
+import io.netty.resolver.dns.DnsNameResolver;
+import io.netty.resolver.dns.DnsNameResolverBuilder;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
@@ -67,6 +69,10 @@ import com.google.common.collect.Sets;
public class ProxyService implements Closeable {
private final ProxyConfiguration proxyConfig;
+ @Getter
+ private final DnsNameResolver dnsNameResolver;
+ @Getter
+ private final BrokerProxyValidator brokerProxyValidator;
private String serviceUrl;
private String serviceUrlTls;
private ConfigurationCacheService configurationCacheService;
@@ -139,6 +145,15 @@ public class ProxyService implements Closeable {
this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, workersThreadFactory);
this.authenticationService = authenticationService;
+ DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder(workerGroup.next())
+ .channelType(EventLoopUtil.getDatagramChannelClass(workerGroup));
+ dnsNameResolver = dnsNameResolverBuilder.build();
+
+ brokerProxyValidator = new BrokerProxyValidator(dnsNameResolver.asAddressResolver(),
+ proxyConfig.getBrokerProxyAllowedHostNames(),
+ proxyConfig.getBrokerProxyAllowedIPAddresses(),
+ proxyConfig.getBrokerProxyAllowedTargetPorts());
+
statsExecutor = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("proxy-stats-executor"));
statsExecutor.schedule(()->{
@@ -220,6 +235,8 @@ public class ProxyService implements Closeable {
}
public void close() throws IOException {
+ dnsNameResolver.close();
+
if (discoveryProvider != null) {
discoveryProvider.close();
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 25e0f59..2a01d5b 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -27,6 +27,7 @@ import static org.slf4j.bridge.SLF4JBridgeHandler.install;
import static org.slf4j.bridge.SLF4JBridgeHandler.removeHandlersForRootLogger;
import com.google.common.annotations.VisibleForTesting;
+import lombok.Getter;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
@@ -87,8 +88,10 @@ public class ProxyServiceStarter {
private ProxyConfiguration config;
+ @Getter
private ProxyService proxyService;
+ @Getter
private WebServer server;
public ProxyServiceStarter(String[] args) throws Exception {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index 658dd87..a033a87 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.proxy.server;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
@@ -46,6 +48,7 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
private final ProxyService proxyService;
private final boolean enableTls;
private final boolean tlsEnabledWithKeyStore;
+ private final int brokerProxyReadTimeoutMs;
private SslContextAutoRefreshBuilder<SslContext> serverSslCtxRefresher;
private SslContextAutoRefreshBuilder<SslContext> clientSslCtxRefresher;
@@ -58,6 +61,7 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
this.proxyService = proxyService;
this.enableTls = enableTls;
this.tlsEnabledWithKeyStore = serviceConfig.isTlsEnabledWithKeyStore();
+ this.brokerProxyReadTimeoutMs = serviceConfig.getBrokerProxyReadTimeoutMs();
if (enableTls) {
if (tlsEnabledWithKeyStore) {
@@ -127,6 +131,10 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
ch.pipeline().addLast(TLS_HANDLER,
new SslHandler(serverSSLContextAutoRefreshBuilder.get().createSSLEngine()));
}
+ if (brokerProxyReadTimeoutMs > 0) {
+ ch.pipeline().addLast("readTimeoutHandler",
+ new ReadTimeoutHandler(brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS));
+ }
if (proxyService.getConfiguration().isHaProxyProtocolEnabled()) {
ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new OptionalProxyProtocolDecoder());
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/TargetAddressDeniedException.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/TargetAddressDeniedException.java
new file mode 100644
index 0000000..e62525f
--- /dev/null
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/TargetAddressDeniedException.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.proxy.server;
+
+class TargetAddressDeniedException extends RuntimeException {
+ public TargetAddressDeniedException(String message) {
+ super(message);
+ }
+}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
index 7b34695..a1c6e63 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
@@ -79,6 +79,7 @@ public class AuthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest {
proxyConfig.setAuthenticationEnabled(true);
proxyConfig.setAuthorizationEnabled(true);
proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setServicePortTls(Optional.of(0));
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java
new file mode 100644
index 0000000..8e45755
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import io.netty.resolver.AddressResolver;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.SucceededFuture;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.concurrent.ExecutionException;
+import org.apache.curator.shaded.com.google.common.net.InetAddresses;
+import org.testng.annotations.Test;
+
+public class BrokerProxyValidatorTest {
+
+ @Test
+ public void shouldAllowValidInput() throws Exception {
+ BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator(
+ createMockedAddressResolver("1.2.3.4"),
+ "myhost"
+ , "1.2.0.0/16"
+ , "6650");
+ InetSocketAddress inetSocketAddress = brokerProxyValidator.resolveAndCheckTargetAddress("myhost:6650").get();
+ assertNotNull(inetSocketAddress);
+ assertEquals(inetSocketAddress.getAddress().getHostAddress(), "1.2.3.4");
+ assertEquals(inetSocketAddress.getPort(), 6650);
+ }
+
+ @Test(expectedExceptions = ExecutionException.class,
+ expectedExceptionsMessageRegExp = ".*Given host in 'myhost:6650' isn't allowed.")
+ public void shouldPreventInvalidHostName() throws Exception {
+ BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator(
+ createMockedAddressResolver("1.2.3.4"),
+ "allowedhost"
+ , "1.2.0.0/16"
+ , "6650");
+ brokerProxyValidator.resolveAndCheckTargetAddress("myhost:6650").get();
+ }
+
+ @Test(expectedExceptions = ExecutionException.class,
+ expectedExceptionsMessageRegExp = ".* The IP address of the given host and port 'myhost:6650' isn't allowed.")
+ public void shouldPreventInvalidIPAddress() throws Exception {
+ BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator(
+ createMockedAddressResolver("1.2.3.4"),
+ "myhost"
+ , "1.3.0.0/16"
+ , "6650");
+ brokerProxyValidator.resolveAndCheckTargetAddress("myhost:6650").get();
+ }
+
+ @Test
+ public void shouldSupportHostNamePattern() throws Exception {
+ BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator(
+ createMockedAddressResolver("1.2.3.4"),
+ "*.mydomain"
+ , "1.2.0.0/16"
+ , "6650");
+ brokerProxyValidator.resolveAndCheckTargetAddress("myhost.mydomain:6650").get();
+ }
+
+ @Test
+ public void shouldAllowAllWithWildcard() throws Exception {
+ BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator(
+ createMockedAddressResolver("1.2.3.4"),
+ "*"
+ , "*"
+ , "6650");
+ brokerProxyValidator.resolveAndCheckTargetAddress("myhost.mydomain:6650").get();
+ }
+
+ private AddressResolver<InetSocketAddress> createMockedAddressResolver(String ipAddressResult) {
+ AddressResolver<InetSocketAddress> inetSocketAddressResolver = mock(AddressResolver.class);
+ when(inetSocketAddressResolver.resolve(any())).then(invocationOnMock -> {
+ InetSocketAddress address = (InetSocketAddress) invocationOnMock.getArgument(0);
+ return new SucceededFuture<SocketAddress>(mock(EventExecutor.class),
+ new InetSocketAddress(InetAddresses.forString(ipAddressResult), address.getPort()));
+ });
+ return inetSocketAddressResolver;
+ }
+}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
index ad28ebe..4bf30b0 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
@@ -72,6 +72,7 @@ public class ProxyAdditionalServletTest extends MockedPulsarServiceBaseTest {
internalSetup();
proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setZookeeperServers(DUMMY_VALUE);
proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
index ddc90df..8bc0da4 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
@@ -104,6 +104,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
proxyConfig.setAuthenticationEnabled(true);
proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setServicePortTls(Optional.of(0));
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
index bcdd031..4bca0ea 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -212,6 +212,7 @@ public class ProxyAuthenticationTest extends ProducerConsumerBase {
ProxyConfiguration proxyConfig = new ProxyConfiguration();
proxyConfig.setAuthenticationEnabled(true);
proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java
new file mode 100644
index 0000000..5f533e3
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import org.testng.annotations.Test;
+
+public class ProxyConnectionTest {
+
+ @Test
+ public void testMatchesHostAndPort() {
+ assertTrue(ProxyConnection
+ .matchesHostAndPort("pulsar://", "pulsar://1.2.3.4:6650", "1.2.3.4:6650"));
+ assertTrue(ProxyConnection
+ .matchesHostAndPort("pulsar+ssl://", "pulsar+ssl://1.2.3.4:6650", "1.2.3.4:6650"));
+ assertFalse(ProxyConnection
+ .matchesHostAndPort("pulsar://", "pulsar://1.2.3.4:12345", "5.6.7.8:1234"));
+ assertFalse(ProxyConnection
+ .matchesHostAndPort("pulsar://", "pulsar://1.2.3.4:12345", "1.2.3.4:1234"));
+ }
+}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index c30ae4d..0b064f8 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -53,6 +53,7 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest {
internalSetup();
proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setZookeeperServers(DUMMY_VALUE);
proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
index 1f0420c..4859e2c 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
@@ -57,6 +57,7 @@ public class ProxyEnableHAProxyProtocolTest extends MockedPulsarServiceBaseTest
internalSetup();
proxyConfig.setServicePort(Optional.ofNullable(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setZookeeperServers(DUMMY_VALUE);
proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
proxyConfig.setHaProxyProtocolEnabled(true);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
index a39b3d0..82150ef 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
@@ -104,6 +104,7 @@ public class ProxyForwardAuthDataTest extends ProducerConsumerBase {
proxyConfig.setAuthenticationEnabled(true);
proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java
index a075851..444b013 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java
@@ -79,6 +79,7 @@ public class ProxyKeyStoreTlsTestWithAuth extends MockedPulsarServiceBaseTest {
internalSetup();
proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setServicePortTls(Optional.of(0));
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java
index b920ef4..47dcbb7 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java
@@ -74,6 +74,7 @@ public class ProxyKeyStoreTlsTestWithoutAuth extends MockedPulsarServiceBaseTest
internalSetup();
proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setServicePortTls(Optional.of(0));
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index 8949f8d..a82d40b 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -52,6 +52,7 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest {
internalSetup();
proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setZookeeperServers(DUMMY_VALUE);
proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
index a218068..7f64bbf 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
@@ -72,6 +72,7 @@ public class ProxyParserTest extends MockedPulsarServiceBaseTest {
internalSetup();
proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setZookeeperServers(DUMMY_VALUE);
proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
//enable full parsing feature
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
index 57018ea..f0eb11f 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
@@ -209,6 +209,7 @@ public class ProxyRolesEnforcementTest extends ProducerConsumerBase {
proxyConfig.setAuthenticationEnabled(true);
proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
index 6dbb663..65424f2 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
@@ -49,6 +49,8 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
static final String[] ARGS = new String[]{"-c", "./src/test/resources/proxy.conf"};
private ProxyServiceStarter serviceStarter;
+ private String serviceUrl;
+ private int webPort;
@Override
@BeforeClass
@@ -57,10 +59,14 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
serviceStarter = new ProxyServiceStarter(ARGS);
serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl());
serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
- serviceStarter.getConfig().setServicePort(Optional.of(11000));
+ serviceStarter.getConfig().setWebServicePort(Optional.of(0));
+ serviceStarter.getConfig().setServicePort(Optional.of(0));
serviceStarter.getConfig().setWebSocketServiceEnabled(true);
+ serviceStarter.getConfig().setBrokerProxyAllowedTargetPorts("*");
CollectorRegistry.defaultRegistry.clear();
serviceStarter.start();
+ serviceUrl = serviceStarter.getProxyService().getServiceUrl();
+ webPort = serviceStarter.getServer().getListenPortHTTP().get();
}
@Override
@@ -73,7 +79,7 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
@Test
public void testProducer() throws Exception {
@Cleanup
- PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:11000")
+ PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl)
.build();
@Cleanup
@@ -92,7 +98,7 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
WebSocketClient producerWebSocketClient = new WebSocketClient(producerClient);
producerWebSocketClient.start();
MyWebSocket producerSocket = new MyWebSocket();
- String produceUri = "ws://localhost:8080/ws/producer/persistent/sample/test/local/websocket-topic";
+ String produceUri = "ws://localhost:" + webPort + "/ws/producer/persistent/sample/test/local/websocket-topic";
Future<Session> producerSession = producerWebSocketClient.connect(producerSocket, URI.create(produceUri));
ProducerMessage produceRequest = new ProducerMessage();
@@ -103,7 +109,7 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
WebSocketClient consumerWebSocketClient = new WebSocketClient(consumerClient);
consumerWebSocketClient.start();
MyWebSocket consumerSocket = new MyWebSocket();
- String consumeUri = "ws://localhost:8080/ws/consumer/persistent/sample/test/local/websocket-topic/my-sub";
+ String consumeUri = "ws://localhost:" + webPort + "/ws/consumer/persistent/sample/test/local/websocket-topic/my-sub";
Future<Session> consumerSession = consumerWebSocketClient.connect(consumerSocket, URI.create(consumeUri));
consumerSession.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes()));
producerSession.get().getRemote().sendString(ObjectMapperFactory.getThreadLocal().writeValueAsString(produceRequest));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
index 8198774..8cd7dee 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
@@ -53,6 +53,8 @@ public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest {
private final String TLS_PROXY_CERT_FILE_PATH = "./src/test/resources/authentication/tls/server-cert.pem";
private final String TLS_PROXY_KEY_FILE_PATH = "./src/test/resources/authentication/tls/server-key.pem";
private ProxyServiceStarter serviceStarter;
+ private String serviceUrl;
+ private int webPort;
@Override
@BeforeClass
@@ -65,12 +67,17 @@ public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest {
serviceStarter.getConfig().setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
serviceStarter.getConfig().setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
- serviceStarter.getConfig().setServicePortTls(Optional.of(11043));
+ serviceStarter.getConfig().setServicePort(Optional.empty());
+ serviceStarter.getConfig().setServicePortTls(Optional.of(0));
+ serviceStarter.getConfig().setWebServicePort(Optional.of(0));
serviceStarter.getConfig().setTlsEnabledWithBroker(true);
serviceStarter.getConfig().setWebSocketServiceEnabled(true);
serviceStarter.getConfig().setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH);
serviceStarter.getConfig().setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH);
+ serviceStarter.getConfig().setBrokerProxyAllowedTargetPorts("*");
serviceStarter.start();
+ serviceUrl = serviceStarter.getProxyService().getServiceUrlTls();
+ webPort = serviceStarter.getServer().getListenPortHTTP().get();
}
protected void doInitConf() throws Exception {
@@ -89,7 +96,7 @@ public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest {
@Test
public void testProducer() throws Exception {
@Cleanup
- PulsarClient client = PulsarClient.builder().serviceUrl("pulsar+ssl://localhost:11043")
+ PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl)
.allowTlsInsecureConnection(false).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
.build();
@@ -109,7 +116,7 @@ public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest {
WebSocketClient producerWebSocketClient = new WebSocketClient(producerClient);
producerWebSocketClient.start();
MyWebSocket producerSocket = new MyWebSocket();
- String produceUri = "ws://localhost:8080/ws/producer/persistent/sample/test/local/websocket-topic";
+ String produceUri = "ws://localhost:" + webPort + "/ws/producer/persistent/sample/test/local/websocket-topic";
Future<Session> producerSession = producerWebSocketClient.connect(producerSocket, URI.create(produceUri));
ProducerMessage produceRequest = new ProducerMessage();
@@ -120,7 +127,7 @@ public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest {
WebSocketClient consumerWebSocketClient = new WebSocketClient(consumerClient);
consumerWebSocketClient.start();
MyWebSocket consumerSocket = new MyWebSocket();
- String consumeUri = "ws://localhost:8080/ws/consumer/persistent/sample/test/local/websocket-topic/my-sub";
+ String consumeUri = "ws://localhost:" + webPort + "/ws/consumer/persistent/sample/test/local/websocket-topic/my-sub";
Future<Session> consumerSession = consumerWebSocketClient.connect(consumerSocket, URI.create(consumeUri));
consumerSession.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes()));
producerSession.get().getRemote().sendString(ObjectMapperFactory.getThreadLocal().writeValueAsString(produceRequest));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
index e246625..a9febfc 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
@@ -70,6 +70,7 @@ public class ProxyStatsTest extends MockedPulsarServiceBaseTest {
internalSetup();
proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setZookeeperServers(DUMMY_VALUE);
proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 8106ef3..4e8bfec 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -91,6 +91,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
internalSetup();
proxyConfig.setServicePort(Optional.ofNullable(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setZookeeperServers(DUMMY_VALUE);
proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
index 465f4d2..942dab3 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
@@ -57,6 +57,7 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest {
internalSetup();
proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setServicePortTls(Optional.of(0));
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java
index 9273a63..800cd36 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java
@@ -58,6 +58,7 @@ public class ProxyTlsTestWithAuth extends MockedPulsarServiceBaseTest {
writer.close();
proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setServicePortTls(Optional.of(0));
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
index 14902c3..546ed99 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
@@ -111,6 +111,7 @@ public class ProxyWithAuthorizationNegTest extends ProducerConsumerBase {
proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setServicePortTls(Optional.of(0));
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
index 0e533a9..23a8227 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
@@ -183,6 +183,7 @@ public class ProxyWithAuthorizationTest extends ProducerConsumerBase {
proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setServicePortTls(Optional.of(0));
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setWebServicePortTls(Optional.of(0));
@@ -398,6 +399,7 @@ public class ProxyWithAuthorizationTest extends ProducerConsumerBase {
proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setServicePortTls(Optional.of(0));
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
index 4a7d0b8..830a251 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
@@ -97,6 +97,7 @@ public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase {
proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setWebServicePort(Optional.of(0));
// enable auth&auth and use JWT at proxy
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
index d59b6c9..e1b30dc 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
@@ -102,6 +102,7 @@ public class ProxyWithoutServiceDiscoveryTest extends ProducerConsumerBase {
proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setServicePortTls(Optional.of(0));
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
index 17c0992..e048d61 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
@@ -76,6 +76,7 @@ public class SuperUserAuthedAdminProxyHandlerTest extends MockedPulsarServiceBas
proxyConfig.setAuthenticationEnabled(true);
proxyConfig.setAuthorizationEnabled(true);
proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setServicePortTls(Optional.of(0));
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
index ce0be5b..4429f01 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
@@ -66,6 +66,7 @@ public class UnauthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest {
// start proxy service
proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setBrokerWebServiceURL(brokerUrl.toString());
proxyConfig.setStatusFilePath(STATUS_FILE_PATH);