You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/12/16 00:40:49 UTC
[pulsar] branch master updated: Enable CheckStyle Plugin in Pulsar proxy (#13343)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cba8800 Enable CheckStyle Plugin in Pulsar proxy (#13343)
cba8800 is described below
commit cba8800de1013d0e8ac81f43ecb040a55978c358
Author: ZhangJian He <sh...@gmail.com>
AuthorDate: Thu Dec 16 08:39:28 2021 +0800
Enable CheckStyle Plugin in Pulsar proxy (#13343)
---
.../src/main/resources/pulsar/suppressions.xml | 2 +
pulsar-proxy/pom.xml | 13 +++
.../proxy/extensions/ExtensionsDefinitions.java | 5 +-
.../pulsar/proxy/extensions/ProxyExtension.java | 5 +-
.../proxy/extensions/ProxyExtensionMetadata.java | 3 +-
.../extensions/ProxyExtensionWithClassLoader.java | 7 +-
.../pulsar/proxy/extensions/ProxyExtensions.java | 13 ++-
.../proxy/extensions/ProxyExtensionsUtils.java | 14 ++--
...xtensionsDefinitions.java => package-info.java} | 19 +----
.../pulsar/proxy/server/AdminProxyHandler.java | 21 +++--
.../proxy/server/BrokerDiscoveryProvider.java | 8 +-
.../pulsar/proxy/server/DirectProxyHandler.java | 67 +++++++--------
.../pulsar/proxy/server/LookupProxyHandler.java | 97 ++++++++++------------
.../pulsar/proxy/server/ParserProxyHandler.java | 86 ++++++++++---------
.../apache/pulsar/proxy/server/ProxyClientCnx.java | 5 +-
.../pulsar/proxy/server/ProxyConfiguration.java | 30 +++----
.../pulsar/proxy/server/ProxyConnection.java | 44 +++++-----
.../pulsar/proxy/server/ProxyConnectionPool.java | 4 +-
.../apache/pulsar/proxy/server/ProxyService.java | 29 ++++---
.../pulsar/proxy/server/ProxyServiceStarter.java | 51 ++++++------
.../proxy/server/ServiceChannelInitializer.java | 13 ++-
.../org/apache/pulsar/proxy/server/WebServer.java | 13 +--
.../package-info.java} | 19 +----
.../org/apache/pulsar/proxy/stats/ProxyStats.java | 23 ++---
.../apache/pulsar/proxy/stats/RestException.java | 2 -
.../org/apache/pulsar/proxy/stats/TopicStats.java | 4 +-
.../package-info.java} | 19 +----
.../package-info.java} | 19 +----
.../server/ProxyConnectionThrottlingTest.java | 4 +-
.../proxy/server/ProxyLookupThrottlingTest.java | 4 +-
30 files changed, 292 insertions(+), 351 deletions(-)
diff --git a/buildtools/src/main/resources/pulsar/suppressions.xml b/buildtools/src/main/resources/pulsar/suppressions.xml
index 0b120bb..ef9658c 100644
--- a/buildtools/src/main/resources/pulsar/suppressions.xml
+++ b/buildtools/src/main/resources/pulsar/suppressions.xml
@@ -39,6 +39,8 @@
<!-- suppress most all checks expect below-->
<suppress checks="^(?!.*UnusedImports).*$" files=".*[\\/]src[\\/]test[\\/].*"/>
+ <suppress checks="IllegalImport" files="ProxyServiceStarter.java"/>
+
<!-- suppress all checks in the copied code -->
<suppress checks=".*" files=".+[\\/]com[\\/]scurrilous[\\/]circe[\\/].+\.java"/>
diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml
index 0c8e649..529cc77 100644
--- a/pulsar-proxy/pom.xml
+++ b/pulsar-proxy/pom.xml
@@ -198,6 +198,19 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>checkstyle</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
<profiles>
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java
index 844c7ca..2cdbad6 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java
@@ -18,11 +18,10 @@
*/
package org.apache.pulsar.proxy.extensions;
-import lombok.Data;
-import lombok.experimental.Accessors;
-
import java.util.Map;
import java.util.TreeMap;
+import lombok.Data;
+import lombok.experimental.Accessors;
/**
* The collection of Proxy Extensions.
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtension.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtension.java
index b973e10..928a4b4 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtension.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtension.java
@@ -20,14 +20,13 @@ package org.apache.pulsar.proxy.extensions;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
+import java.net.InetSocketAddress;
+import java.util.Map;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyService;
-import java.net.InetSocketAddress;
-import java.util.Map;
-
/**
* The extension interface for support additional extensions on Pulsar Proxy.
*/
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionMetadata.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionMetadata.java
index 632c841..935a393 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionMetadata.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionMetadata.java
@@ -18,11 +18,10 @@
*/
package org.apache.pulsar.proxy.extensions;
+import java.nio.file.Path;
import lombok.Data;
import lombok.NoArgsConstructor;
-import java.nio.file.Path;
-
/**
* The metadata of Proxy Extension.
*/
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoader.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoader.java
index 1f6924a..922c339 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoader.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoader.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.proxy.extensions;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -27,10 +30,6 @@ import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyService;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Map;
-
/**
* A extension with its classloader.
*/
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensions.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensions.java
index ba3c383..14dae3a 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensions.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensions.java
@@ -21,22 +21,19 @@ package org.apache.pulsar.proxy.extensions;
import com.google.common.collect.ImmutableMap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
-
+import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.HashSet;
-
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyService;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
* A collection of loaded extensions.
*/
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionsUtils.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionsUtils.java
index 2f02827..f8a532d 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionsUtils.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionsUtils.java
@@ -18,12 +18,7 @@
*/
package org.apache.pulsar.proxy.extensions;
-import lombok.experimental.UtilityClass;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.common.nar.NarClassLoader;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
-
+import static com.google.common.base.Preconditions.checkArgument;
import java.io.File;
import java.io.IOException;
import java.nio.file.DirectoryStream;
@@ -31,8 +26,11 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
-
-import static com.google.common.base.Preconditions.checkArgument;
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
/**
* Util class to search and load {@link ProxyExtension}s.
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/package-info.java
similarity index 69%
copy from pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java
copy to pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/package-info.java
index 844c7ca..7ef1689 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/package-info.java
@@ -16,21 +16,4 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.proxy.extensions;
-
-import lombok.Data;
-import lombok.experimental.Accessors;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * The collection of Proxy Extensions.
- */
-@Data
-@Accessors(fluent = true)
-class ExtensionsDefinitions {
-
- private final Map<String, ProxyExtensionMetadata> extensions = new TreeMap<>();
-
-}
+package org.apache.pulsar.proxy.extensions;
\ No newline at end of file
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
index 853eb0b..a38e2f3 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.proxy.server;
import static org.apache.commons.lang3.StringUtils.isBlank;
-
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -33,13 +32,11 @@ import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
-
import javax.net.ssl.SSLContext;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-
import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
@@ -115,14 +112,16 @@ class AdminProxyHandler extends ProxyServlet {
String value = config.getInitParameter("maxThreads");
if (value == null || "-".equals(value)) {
executor = (Executor) getServletContext().getAttribute("org.eclipse.jetty.server.Executor");
- if (executor == null)
+ if (executor == null) {
throw new IllegalStateException("No server executor for proxy");
+ }
} else {
QueuedThreadPool qtp = new QueuedThreadPool(Integer.parseInt(value));
String servletName = config.getServletName();
int dot = servletName.lastIndexOf('.');
- if (dot >= 0)
+ if (dot >= 0) {
servletName = servletName.substring(dot + 1);
+ }
qtp.setName(servletName);
executor = qtp;
}
@@ -130,22 +129,26 @@ class AdminProxyHandler extends ProxyServlet {
client.setExecutor(executor);
value = config.getInitParameter("maxConnections");
- if (value == null)
+ if (value == null) {
value = "256";
+ }
client.setMaxConnectionsPerDestination(Integer.parseInt(value));
value = config.getInitParameter("idleTimeout");
- if (value == null)
+ if (value == null) {
value = "30000";
+ }
client.setIdleTimeout(Long.parseLong(value));
value = config.getInitParameter("requestBufferSize");
- if (value != null)
+ if (value != null) {
client.setRequestBufferSize(Integer.parseInt(value));
+ }
value = config.getInitParameter("responseBufferSize");
- if (value != null)
+ if (value != null){
client.setResponseBufferSize(Integer.parseInt(value));
+ }
try {
client.start();
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
index bb90be2..ae81350 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
@@ -19,7 +19,7 @@
package org.apache.pulsar.proxy.server;
import static org.apache.bookkeeper.common.util.MathUtils.signSafeMod;
-
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
@@ -27,7 +27,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.broker.PulsarServerException;
@@ -43,8 +42,6 @@ import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.util.concurrent.DefaultThreadFactory;
-
/**
* Maintains available active broker list and returns next active broker in round-robin for discovery service.
* This is an API used by Proxy Extensions.
@@ -151,7 +148,8 @@ public class BrokerDiscoveryProvider implements Closeable {
throw new IllegalAccessException(String.format("Failed to get property %s admin data due to %s",
topicName.getTenant(), e.getMessage()));
}
- if (!service.getAuthorizationService().isTenantAdmin(topicName.getTenant(), role, tenantInfo, authenticationData).get()) {
+ if (!service.getAuthorizationService()
+ .isTenantAdmin(topicName.getTenant(), role, tenantInfo, authenticationData).get()) {
throw new IllegalAccessException("Don't have permission to administrate resources on this tenant");
}
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index c896be5..ead243a 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.proxy.server;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
-
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -42,7 +41,6 @@ import io.netty.handler.ssl.SslHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
-
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
@@ -50,16 +48,12 @@ import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
-
import javax.net.ssl.SSLSession;
-
import lombok.Getter;
-
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.common.tls.TlsHostnameVerifier;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
@@ -67,6 +61,7 @@ import org.apache.pulsar.common.api.proto.CommandConnected;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarDecoder;
import org.apache.pulsar.common.stats.Rate;
+import org.apache.pulsar.common.tls.TlsHostnameVerifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -152,25 +147,29 @@ public class DirectProxyHandler {
inboundOutboundChannelMap.put(outboundChannel.id() , inboundChannel.id());
}
- if (config.isHaProxyProtocolEnabled()) {
- if (proxyConnection.hasHAProxyMessage()) {
- outboundChannel.writeAndFlush(encodeProxyProtocolMessage(proxyConnection.getHAProxyMessage()));
- } else {
- if (inboundChannel.remoteAddress() instanceof InetSocketAddress) {
- InetSocketAddress clientAddress = (InetSocketAddress) inboundChannel.remoteAddress();
- String sourceAddress = clientAddress.getAddress().getHostAddress();
- int sourcePort = clientAddress.getPort();
- if (outboundChannel.localAddress() instanceof InetSocketAddress) {
- InetSocketAddress proxyAddress = (InetSocketAddress) inboundChannel.remoteAddress();
- String destinationAddress = proxyAddress.getAddress().getHostAddress();
- int destinationPort = proxyAddress.getPort();
- HAProxyMessage msg = new HAProxyMessage(HAProxyProtocolVersion.V1, HAProxyCommand.PROXY,
- HAProxyProxiedProtocol.TCP4, sourceAddress, destinationAddress, sourcePort, destinationPort);
- outboundChannel.writeAndFlush(encodeProxyProtocolMessage(msg));
- msg.release();
- }
- }
+ if (!config.isHaProxyProtocolEnabled()) {
+ return;
+ }
+
+ if (proxyConnection.hasHAProxyMessage()) {
+ outboundChannel.writeAndFlush(encodeProxyProtocolMessage(proxyConnection.getHAProxyMessage()));
+ } else {
+ if (!(inboundChannel.remoteAddress() instanceof InetSocketAddress)) {
+ return;
+ }
+ if (!(outboundChannel.localAddress() instanceof InetSocketAddress)) {
+ return;
}
+ InetSocketAddress clientAddress = (InetSocketAddress) inboundChannel.remoteAddress();
+ String sourceAddress = clientAddress.getAddress().getHostAddress();
+ int sourcePort = clientAddress.getPort();
+ InetSocketAddress proxyAddress = (InetSocketAddress) inboundChannel.remoteAddress();
+ String destinationAddress = proxyAddress.getAddress().getHostAddress();
+ int destinationPort = proxyAddress.getPort();
+ HAProxyMessage msg = new HAProxyMessage(HAProxyProtocolVersion.V1, HAProxyCommand.PROXY,
+ HAProxyProxiedProtocol.TCP4, sourceAddress, destinationAddress, sourcePort, destinationPort);
+ outboundChannel.writeAndFlush(encodeProxyProtocolMessage(msg));
+ msg.release();
}
});
}
@@ -246,9 +245,9 @@ public class DirectProxyHandler {
break;
case HandshakeCompleted:
- ProxyService.opsCounter.inc();
+ ProxyService.OPS_COUNTER.inc();
if (msg instanceof ByteBuf) {
- ProxyService.bytesCounter.inc(((ByteBuf) msg).readableBytes());
+ ProxyService.BYTES_COUNTER.inc(((ByteBuf) msg).readableBytes());
}
inboundChannel.writeAndFlush(msg).addListener(this);
break;
@@ -352,14 +351,16 @@ public class DirectProxyHandler {
// Enable parsing feature, proxyLogLevel(1 or 2)
// Add parser handler
if (connected.hasMaxMessageSize()) {
- inboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder",
- new LengthFieldBasedFrameDecoder(connected.getMaxMessageSize()
- + Commands.MESSAGE_SIZE_FRAME_PADDING,
- 0, 4, 0, 4));
+ inboundChannel.pipeline()
+ .replace("frameDecoder", "newFrameDecoder",
+ new LengthFieldBasedFrameDecoder(connected.getMaxMessageSize()
+ + Commands.MESSAGE_SIZE_FRAME_PADDING,
+ 0, 4, 0, 4));
outboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder",
- new LengthFieldBasedFrameDecoder(
- connected.getMaxMessageSize()
- + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
+ new LengthFieldBasedFrameDecoder(
+ connected.getMaxMessageSize()
+ + Commands.MESSAGE_SIZE_FRAME_PADDING,
+ 0, 4, 0, 4));
inboundChannel.pipeline().addBefore("handler", "inboundParser",
new ParserProxyHandler(service, inboundChannel,
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index b14bea5..6319824 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -19,67 +19,64 @@
package org.apache.pulsar.proxy.server;
import static org.apache.commons.lang3.StringUtils.isBlank;
-
+import io.netty.buffer.ByteBuf;
+import io.prometheus.client.Counter;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Optional;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandGetSchema;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.buffer.ByteBuf;
-import io.prometheus.client.Counter;
-
public class LookupProxyHandler {
private final String throttlingErrorMessage = "Too many concurrent lookup and partitionsMetadata requests";
private final ProxyService service;
private final ProxyConnection proxyConnection;
private final boolean connectWithTLS;
- private SocketAddress clientAddress;
- private String brokerServiceURL;
+ private final SocketAddress clientAddress;
+ private final String brokerServiceURL;
- private static final Counter lookupRequests = Counter
+ private static final Counter LOOKUP_REQUESTS = Counter
.build("pulsar_proxy_lookup_requests", "Counter of topic lookup requests").create().register();
- private static final Counter partitionsMetadataRequests = Counter
+ private static final Counter PARTITIONS_METADATA_REQUESTS = Counter
.build("pulsar_proxy_partitions_metadata_requests", "Counter of partitions metadata requests").create()
.register();
- private static final Counter getTopicsOfNamespaceRequestss = Counter
+ private static final Counter GET_TOPICS_OF_NAMESPACE_REQUESTS = Counter
.build("pulsar_proxy_get_topics_of_namespace_requests", "Counter of getTopicsOfNamespace requests")
.create()
.register();
- private static final Counter getSchemaRequests = Counter
+ private static final Counter GET_SCHEMA_REQUESTS = Counter
.build("pulsar_proxy_get_schema_requests", "Counter of schema requests")
.create()
.register();
- static final Counter rejectedLookupRequests = Counter.build("pulsar_proxy_rejected_lookup_requests",
+ static final Counter REJECTED_LOOKUP_REQUESTS = Counter.build("pulsar_proxy_rejected_lookup_requests",
"Counter of topic lookup requests rejected due to throttling").create().register();
- static final Counter rejectedPartitionsMetadataRequests = Counter
+ static final Counter REJECTED_PARTITIONS_METADATA_REQUESTS = Counter
.build("pulsar_proxy_rejected_partitions_metadata_requests",
"Counter of partitions metadata requests rejected due to throttling")
.create().register();
- static final Counter rejectedGetTopicsOfNamespaceRequests = Counter
+ static final Counter REJECTED_GET_TOPICS_OF_NAMESPACE_REQUESTS = Counter
.build("pulsar_proxy_rejected_get_topics_of_namespace_requests",
"Counter of getTopicsOfNamespace requests rejected due to throttling")
.create().register();
@@ -99,7 +96,7 @@ public class LookupProxyHandler {
}
long clientRequestId = lookup.getRequestId();
if (this.service.getLookupRequestSemaphore().tryAcquire()) {
- lookupRequests.inc();
+ LOOKUP_REQUESTS.inc();
String topic = lookup.getTopic();
String serviceUrl;
if (isBlank(brokerServiceURL)) {
@@ -121,7 +118,7 @@ public class LookupProxyHandler {
performLookup(clientRequestId, topic, serviceUrl, false, 10);
this.service.getLookupRequestSemaphore().release();
} else {
- rejectedLookupRequests.inc();
+ REJECTED_LOOKUP_REQUESTS.inc();
if (log.isDebugEnabled()) {
log.debug("Lookup Request ID {} from {} rejected - {}.", clientRequestId, clientAddress,
throttlingErrorMessage);
@@ -179,9 +176,9 @@ public class LookupProxyHandler {
// to use the appropriate target broker (and port) when it
// will connect back.
if (log.isDebugEnabled()) {
- log.debug(
- "Successfully perform lookup '{}' for topic '{}' with clientReq Id '{}' and lookup-broker {}",
- addr, topic, clientRequestId, brokerUrl);
+ log.debug("Successfully perform lookup '{}' for topic '{}'"
+ + " with clientReq Id '{}' and lookup-broker {}",
+ addr, topic, clientRequestId, brokerUrl);
}
proxyConnection.ctx().writeAndFlush(Commands.newLookupResponse(brokerUrl, brokerUrl, true,
LookupType.Connect, clientRequestId, true /* this is coming from proxy */));
@@ -198,7 +195,7 @@ public class LookupProxyHandler {
}
public void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata) {
- partitionsMetadataRequests.inc();
+ PARTITIONS_METADATA_REQUESTS.inc();
if (log.isDebugEnabled()) {
log.debug("[{}] Received PartitionMetadataLookup", clientAddress);
}
@@ -207,7 +204,7 @@ public class LookupProxyHandler {
handlePartitionMetadataResponse(partitionMetadata, clientRequestId);
this.service.getLookupRequestSemaphore().release();
} else {
- rejectedPartitionsMetadataRequests.inc();
+ REJECTED_PARTITIONS_METADATA_REQUESTS.inc();
if (log.isDebugEnabled()) {
log.debug("PartitionMetaData Request ID {} from {} rejected - {}.", clientRequestId, clientAddress,
throttlingErrorMessage);
@@ -270,7 +267,7 @@ public class LookupProxyHandler {
}
public void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
- getTopicsOfNamespaceRequestss.inc();
+ GET_TOPICS_OF_NAMESPACE_REQUESTS.inc();
if (log.isDebugEnabled()) {
log.debug("[{}] Received GetTopicsOfNamespace", clientAddress);
}
@@ -281,7 +278,7 @@ public class LookupProxyHandler {
handleGetTopicsOfNamespace(commandGetTopicsOfNamespace, requestId);
this.service.getLookupRequestSemaphore().release();
} else {
- rejectedGetTopicsOfNamespaceRequests.inc();
+ REJECTED_GET_TOPICS_OF_NAMESPACE_REQUESTS.inc();
if (log.isDebugEnabled()) {
log.debug("GetTopicsOfNamespace Request ID {} from {} rejected - {}.", requestId, clientAddress,
throttlingErrorMessage);
@@ -296,11 +293,11 @@ public class LookupProxyHandler {
long clientRequestId) {
String serviceUrl = getBrokerServiceUrl(clientRequestId);
- if(!StringUtils.isNotBlank(serviceUrl)) {
+ if (!StringUtils.isNotBlank(serviceUrl)) {
return;
}
- performGetTopicsOfNamespace(clientRequestId, commandGetTopicsOfNamespace.getNamespace(), serviceUrl, 10,
- commandGetTopicsOfNamespace.getMode());
+ performGetTopicsOfNamespace(clientRequestId, commandGetTopicsOfNamespace.getNamespace(), serviceUrl,
+ 10, commandGetTopicsOfNamespace.getMode());
}
private void performGetTopicsOfNamespace(long clientRequestId,
@@ -316,7 +313,7 @@ public class LookupProxyHandler {
InetSocketAddress addr = getAddr(brokerServiceUrl, clientRequestId);
- if(addr == null){
+ if (addr == null) {
return;
}
@@ -331,7 +328,8 @@ public class LookupProxyHandler {
command = Commands.newGetTopicsOfNamespaceRequest(namespaceName, requestId, mode);
clientCnx.newGetTopicsOfNamespace(command, requestId).whenComplete((r, t) -> {
if (t != null) {
- log.warn("[{}] Failed to get TopicsOfNamespace {}: {}", clientAddress, namespaceName, t.getMessage());
+ log.warn("[{}] Failed to get TopicsOfNamespace {}: {}",
+ clientAddress, namespaceName, t.getMessage());
proxyConnection.ctx().writeAndFlush(
Commands.newError(clientRequestId, ServerError.ServiceNotReady, t.getMessage()));
} else {
@@ -350,7 +348,7 @@ public class LookupProxyHandler {
}
public void handleGetSchema(CommandGetSchema commandGetSchema) {
- getSchemaRequests.inc();
+ GET_SCHEMA_REQUESTS.inc();
if (log.isDebugEnabled()) {
log.debug("[{}] Received GetSchema {}", clientAddress, commandGetSchema);
}
@@ -365,12 +363,12 @@ public class LookupProxyHandler {
schemaVersion = Optional.empty();
}
- if(!StringUtils.isNotBlank(serviceUrl)) {
+ if (!StringUtils.isNotBlank(serviceUrl)) {
return;
}
InetSocketAddress addr = getAddr(serviceUrl, clientRequestId);
- if(addr == null){
+ if (addr == null) {
return;
}
if (log.isDebugEnabled()) {
@@ -405,27 +403,24 @@ public class LookupProxyHandler {
}
/**
- * Get default broker service url or discovery an available broker
+ * Get default broker service url or discovery an available broker.
**/
private String getBrokerServiceUrl(long clientRequestId) {
- if (isBlank(brokerServiceURL)) {
- ServiceLookupData availableBroker;
- try {
- availableBroker = service.getDiscoveryProvider().nextBroker();
- } catch (Exception e) {
- log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e);
- proxyConnection.ctx().writeAndFlush(Commands.newError(
- clientRequestId, ServerError.ServiceNotReady, e.getMessage()
- ));
- return null;
- }
- return this.connectWithTLS ?
- availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl();
- } else {
- return this.connectWithTLS ?
- service.getConfiguration().getBrokerServiceURLTLS() : service.getConfiguration().getBrokerServiceURL();
+ if (StringUtils.isNotBlank(brokerServiceURL)) {
+ return this.connectWithTLS ? service.getConfiguration().getBrokerServiceURLTLS()
+ : service.getConfiguration().getBrokerServiceURL();
}
-
+ ServiceLookupData availableBroker;
+ try {
+ availableBroker = service.getDiscoveryProvider().nextBroker();
+ } catch (Exception e) {
+ log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e);
+ proxyConnection.ctx().writeAndFlush(Commands.newError(
+ clientRequestId, ServerError.ServiceNotReady, e.getMessage()
+ ));
+ return null;
+ }
+ return this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl();
}
private InetSocketAddress getAddr(String brokerServiceUrl, long clientRequestId) {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
index f152b32..40c05a3 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
@@ -19,12 +19,18 @@
package org.apache.pulsar.proxy.server;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.raw.MessageParser;
@@ -34,34 +40,31 @@ import org.apache.pulsar.proxy.stats.TopicStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufUtil;
-import io.netty.buffer.CompositeByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-
public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
- private Channel channel;
+ private final Channel channel;
//inbound
protected static final String FRONTEND_CONN = "frontendconn";
//outbound
protected static final String BACKEND_CONN = "backendconn";
- private String connType;
+ private final String connType;
- private int maxMessageSize;
+ private final int maxMessageSize;
private final ProxyService service;
- //producerid+channelid as key
- //or consumerid+channelid as key
- private static Map<String, String> producerHashMap = new ConcurrentHashMap<>();
- private static Map<String, String> consumerHashMap = new ConcurrentHashMap<>();
+ /**
+ * producerid + channelid as key.
+ */
+ private static final Map<String, String> producerHashMap = new ConcurrentHashMap<>();
+
+ /**
+ * consumerid + channelid as key.
+ */
+ private static final Map<String, String> consumerHashMap = new ConcurrentHashMap<>();
public ParserProxyHandler(ProxyService service, Channel channel, String type, int maxMessageSize) {
this.service = service;
@@ -70,23 +73,26 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
this.maxMessageSize = maxMessageSize;
}
- private void logging(Channel conn, BaseCommand.Type cmdtype, String info, List<RawMessage> messages) throws Exception{
+ private void logging(Channel conn, BaseCommand.Type cmdtype, String info, List<RawMessage> messages) {
if (messages != null) {
// lag
StringBuilder infoBuilder = new StringBuilder(info);
for (RawMessage message : messages) {
- infoBuilder.append("[").append(System.currentTimeMillis() - message.getPublishTime()).append("] ").append(new String(ByteBufUtil.getBytes(message.getData()), StandardCharsets.UTF_8));
+ infoBuilder.append("[").append(System.currentTimeMillis() - message.getPublishTime()).append("] ")
+ .append(new String(ByteBufUtil.getBytes(message.getData()), StandardCharsets.UTF_8));
}
info = infoBuilder.toString();
}
// log conn format is like from source to target
switch (this.connType) {
case ParserProxyHandler.FRONTEND_CONN:
- log.info(ParserProxyHandler.FRONTEND_CONN + ":{} cmd:{} msg:{}", "[" + conn.remoteAddress() + conn.localAddress() + "]", cmdtype, info);
+ log.info(ParserProxyHandler.FRONTEND_CONN + ":{} cmd:{} msg:{}", "[" + conn.remoteAddress()
+ + conn.localAddress() + "]", cmdtype, info);
break;
case ParserProxyHandler.BACKEND_CONN:
- log.info(ParserProxyHandler.BACKEND_CONN + ":{} cmd:{} msg:{}", "[" + conn.localAddress() + conn.remoteAddress() + "]", cmdtype, info);
+ log.info(ParserProxyHandler.BACKEND_CONN + ":{} cmd:{} msg:{}", "[" + conn.localAddress()
+ + conn.remoteAddress() + "]", cmdtype, info);
break;
}
}
@@ -94,9 +100,9 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
private final BaseCommand cmd = new BaseCommand();
public void channelRead(ChannelHandlerContext ctx, Object msg) {
- TopicName topicName ;
+ TopicName topicName;
List<RawMessage> messages = new ArrayList<>();
- ByteBuf buffer = (ByteBuf)(msg);
+ ByteBuf buffer = (ByteBuf) (msg);
try {
buffer.markReaderIndex();
@@ -107,9 +113,11 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
switch (cmd.getType()) {
case PRODUCER:
- ParserProxyHandler.producerHashMap.put(String.valueOf(cmd.getProducer().getProducerId()) + "," + String.valueOf(ctx.channel().id()), cmd.getProducer().getTopic());
+ ParserProxyHandler.producerHashMap.put(cmd.getProducer().getProducerId() + "," + ctx.channel().id(),
+ cmd.getProducer().getTopic());
- logging(ctx.channel() , cmd.getType() , "{producer:" + cmd.getProducer().getProducerName() + ",topic:" + cmd.getProducer().getTopic() + "}", null);
+ logging(ctx.channel() , cmd.getType() , "{producer:" + cmd.getProducer().getProducerName()
+ + ",topic:" + cmd.getProducer().getTopic() + "}", null);
break;
case SEND:
@@ -117,10 +125,11 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
logging(ctx.channel() , cmd.getType() , "", null);
break;
}
- topicName = TopicName.get(ParserProxyHandler.producerHashMap.get(String.valueOf(cmd.getSend().getProducerId()) + "," + String.valueOf(ctx.channel().id())));
+ topicName = TopicName.get(ParserProxyHandler.producerHashMap.get(cmd.getSend().getProducerId() + ","
+ + ctx.channel().id()));
MutableLong msgBytes = new MutableLong(0);
- MessageParser.parseMessage(topicName, -1L,
- -1L,buffer,(message) -> {
+ MessageParser.parseMessage(topicName, -1L,
+ -1L, buffer, (message) -> {
messages.add(message);
msgBytes.add(message.getData().readableBytes());
}, maxMessageSize);
@@ -132,9 +141,11 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
break;
case SUBSCRIBE:
- ParserProxyHandler.consumerHashMap.put(String.valueOf(cmd.getSubscribe().getConsumerId()) + "," + String.valueOf(ctx.channel().id()) , cmd.getSubscribe().getTopic());
+ ParserProxyHandler.consumerHashMap.put(cmd.getSubscribe().getConsumerId() + ","
+ + ctx.channel().id(), cmd.getSubscribe().getTopic());
- logging(ctx.channel() , cmd.getType() , "{consumer:" + cmd.getSubscribe().getConsumerName() + ",topic:" + cmd.getSubscribe().getTopic() + "}" , null);
+ logging(ctx.channel() , cmd.getType() , "{consumer:" + cmd.getSubscribe().getConsumerName()
+ + ",topic:" + cmd.getSubscribe().getTopic() + "}" , null);
break;
case MESSAGE:
@@ -142,13 +153,14 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
logging(ctx.channel() , cmd.getType() , "" , null);
break;
}
- topicName = TopicName.get(ParserProxyHandler.consumerHashMap.get(String.valueOf(cmd.getMessage().getConsumerId()) + "," + DirectProxyHandler.inboundOutboundChannelMap.get(ctx.channel().id())));
+ topicName = TopicName.get(ParserProxyHandler.consumerHashMap.get(cmd.getMessage().getConsumerId()
+ + "," + DirectProxyHandler.inboundOutboundChannelMap.get(ctx.channel().id())));
msgBytes = new MutableLong(0);
- MessageParser.parseMessage(topicName, -1L,
- -1L,buffer,(message) -> {
- messages.add(message);
- msgBytes.add(message.getData().readableBytes());
- }, maxMessageSize);
+ MessageParser.parseMessage(topicName, -1L,
+ -1L, buffer, (message) -> {
+ messages.add(message);
+ msgBytes.add(message.getData().readableBytes());
+ }, maxMessageSize);
// update topic stats
topicStats = this.service.getTopicStats().computeIfAbsent(topicName.toString(),
topic -> new TopicStats());
@@ -172,8 +184,8 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
ByteBuf totalSizeBuf = Unpooled.buffer(4);
totalSizeBuf.writeInt(buffer.readableBytes());
CompositeByteBuf compBuf = Unpooled.compositeBuffer();
- compBuf.addComponents(totalSizeBuf,buffer);
- compBuf.writerIndex(totalSizeBuf.capacity()+buffer.capacity());
+ compBuf.addComponents(totalSizeBuf, buffer);
+ compBuf.writerIndex(totalSizeBuf.capacity() + buffer.capacity());
// Release mssages
messages.forEach(RawMessage::release);
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
index 665b9f8..50a77d3 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.proxy.server;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
-
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -48,8 +47,8 @@ public class ProxyClientCnx extends ClientCnx {
@Override
protected ByteBuf newConnectCommand() throws Exception {
if (log.isDebugEnabled()) {
- log.debug("New Connection opened via ProxyClientCnx with params clientAuthRole = {}," +
- " clientAuthData = {}, clientAuthMethod = {}",
+ log.debug("New Connection opened via ProxyClientCnx with params clientAuthRole = {},"
+ + " clientAuthData = {}, clientAuthMethod = {}",
clientAuthRole, clientAuthData, clientAuthMethod);
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 5238c46..41ec92f 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -26,10 +26,9 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
-import java.util.stream.Collectors;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-
+import java.util.stream.Collectors;
import lombok.Getter;
import lombok.Setter;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
@@ -149,7 +148,7 @@ public class ProxyConfiguration implements PulsarConfiguration {
)
private String advertisedAddress;
- @FieldContext(category=CATEGORY_SERVER,
+ @FieldContext(category = CATEGORY_SERVER,
doc = "Enable or disable the proxy protocol.")
private boolean haProxyProtocolEnabled;
@@ -300,7 +299,7 @@ public class ProxyConfiguration implements PulsarConfiguration {
)
private String anonymousUserRole = null;
- /***** --- TLS --- ****/
+ // TLS
@Deprecated
private boolean tlsEnabledInProxy = false;
@@ -361,7 +360,7 @@ public class ProxyConfiguration implements PulsarConfiguration {
)
private boolean tlsRequireTrustedClientCertOnConnect = false;
- /**** --- KeyStore TLS config variables --- ****/
+ // KeyStore TLS config variables
@FieldContext(
category = CATEGORY_KEYSTORE_TLS,
@@ -411,7 +410,9 @@ public class ProxyConfiguration implements PulsarConfiguration {
)
private String tlsTrustStorePassword = null;
- /**** --- KeyStore TLS config variables used for proxy to auth with broker--- ****/
+ /**
+ * KeyStore TLS config variables used for proxy to auth with broker.
+ */
@FieldContext(
category = CATEGORY_KEYSTORE_TLS,
doc = "Whether the Pulsar proxy use KeyStore type to authenticate with Pulsar brokers"
@@ -459,7 +460,7 @@ public class ProxyConfiguration implements PulsarConfiguration {
)
private Set<String> brokerClientTlsProtocols = new TreeSet<>();
- /***** --- HTTP --- ****/
+ // HTTP
@FieldContext(
category = CATEGORY_HTTP,
@@ -478,7 +479,7 @@ public class ProxyConfiguration implements PulsarConfiguration {
+ "proxy, this should be set to the minimum value, 1, so that clients "
+ "see the data as soon as possible."
)
- private int httpOutputBufferSize = 32*1024;
+ private int httpOutputBufferSize = 32 * 1024;
@FieldContext(
minValue = 1,
@@ -532,15 +533,16 @@ public class ProxyConfiguration implements PulsarConfiguration {
private Set<String> additionalServlets = new TreeSet<>();
@FieldContext(
- category = CATEGORY_HTTP,
+ category = CATEGORY_HTTP,
doc = "Enable the enforcement of limits on the incoming HTTP requests"
- )
+ )
private boolean httpRequestsLimitEnabled = false;
@FieldContext(
- category = CATEGORY_HTTP,
- doc = "Max HTTP requests per seconds allowed. The excess of requests will be rejected with HTTP code 429 (Too many requests)"
- )
+ category = CATEGORY_HTTP,
+ doc = "Max HTTP requests per seconds allowed."
+ + " The excess of requests will be rejected with HTTP code 429 (Too many requests)"
+ )
private double httpRequestsMaxPerSecond = 100.0;
@PropertiesContext(
@@ -587,7 +589,7 @@ public class ProxyConfiguration implements PulsarConfiguration {
)
private boolean useSeparateThreadPoolForProxyExtensions = true;
- /***** --- WebSocket --- ****/
+ /***** --- WebSocket. --- ****/
@FieldContext(
category = CATEGORY_WEBSOCKET,
doc = "Enable or disable the WebSocket servlet"
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 87d84f7..a6dba1c 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -19,15 +19,19 @@
package org.apache.pulsar.proxy.server;
import static com.google.common.base.Preconditions.checkArgument;
-
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
-
import java.util.function.Supplier;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
-
-import io.netty.handler.codec.haproxy.HAProxyMessage;
+import lombok.Getter;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
@@ -40,29 +44,21 @@ import org.apache.pulsar.client.impl.PulsarChannelInitializer;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.AuthData;
-import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
import org.apache.pulsar.common.api.proto.CommandConnect;
+import org.apache.pulsar.common.api.proto.CommandGetSchema;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandLookupTopic;
-import org.apache.pulsar.common.api.proto.CommandGetSchema;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.ServerError;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.PulsarHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
-import lombok.Getter;
-
/**
- * Handles incoming discovery request from client and sends appropriate response back to client
+ * Handles incoming discovery request from client and sends appropriate response back to client.
*
*/
public class ProxyConnection extends PulsarHandler implements FutureListener<Void> {
@@ -125,10 +121,10 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
- ProxyService.activeConnections.inc();
- if (ProxyService.activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
+ ProxyService.ACTIVE_CONNECTIONS.inc();
+ if (ProxyService.ACTIVE_CONNECTIONS.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
ctx.close();
- ProxyService.rejectedConnections.inc();
+ ProxyService.REJECTED_CONNECTIONS.inc();
return;
}
}
@@ -136,13 +132,13 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
- ProxyService.activeConnections.dec();
+ ProxyService.ACTIVE_CONNECTIONS.dec();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
- ProxyService.newConnections.inc();
+ ProxyService.NEW_CONNECTIONS.inc();
service.getClientCnxs().add(this);
LOG.info("[{}] New connection opened", remoteAddress);
}
@@ -195,11 +191,11 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
case ProxyConnectionToBroker:
// Pass the buffer to the outbound connection and schedule next read
// only if we can write on the connection
- ProxyService.opsCounter.inc();
+ ProxyService.OPS_COUNTER.inc();
if (msg instanceof ByteBuf) {
int bytes = ((ByteBuf) msg).readableBytes();
directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
- ProxyService.bytesCounter.inc(bytes);
+ ProxyService.BYTES_COUNTER.inc(bytes);
}
directProxyHandler.outboundChannel.writeAndFlush(msg).addListener(this);
break;
@@ -400,7 +396,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
}
/**
- * handles discovery request from client ands sends next active broker address
+ * handles discovery request from client ands sends next active broker address.
*/
@Override
protected void handleLookup(CommandLookupTopic lookup) {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java
index cd1b31d..4dcb095 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java
@@ -18,10 +18,10 @@
*/
package org.apache.pulsar.proxy.server;
+import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
-
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
@@ -29,8 +29,6 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.channel.EventLoopGroup;
-
public class ProxyConnectionPool extends ConnectionPool {
public ProxyConnectionPool(ClientConfigurationData clientConfig, EventLoopGroup eventLoopGroup,
Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 778f42b..2af7ebf 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -28,9 +28,9 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
-import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
+import io.netty.util.concurrent.DefaultThreadFactory;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import java.io.Closeable;
@@ -69,7 +69,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Pulsar proxy service
+ * Pulsar proxy service.
*/
public class ProxyService implements Closeable {
@@ -108,22 +108,22 @@ public class ProxyService implements Closeable {
private static final int numThreads = Runtime.getRuntime().availableProcessors();
- static final Gauge activeConnections = Gauge
+ static final Gauge ACTIVE_CONNECTIONS = Gauge
.build("pulsar_proxy_active_connections", "Number of connections currently active in the proxy").create()
.register();
- static final Counter newConnections = Counter
+ static final Counter NEW_CONNECTIONS = Counter
.build("pulsar_proxy_new_connections", "Counter of connections being opened in the proxy").create()
.register();
- static final Counter rejectedConnections = Counter
+ static final Counter REJECTED_CONNECTIONS = Counter
.build("pulsar_proxy_rejected_connections", "Counter for connections rejected due to throttling").create()
.register();
- static final Counter opsCounter = Counter
+ static final Counter OPS_COUNTER = Counter
.build("pulsar_proxy_binary_ops", "Counter of proxy operations").create().register();
- static final Counter bytesCounter = Counter
+ static final Counter BYTES_COUNTER = Counter
.build("pulsar_proxy_binary_bytes", "Counter of proxy bytes").create().register();
@Getter
@@ -137,11 +137,12 @@ public class ProxyService implements Closeable {
AuthenticationService authenticationService) throws Exception {
requireNonNull(proxyConfig);
this.proxyConfig = proxyConfig;
- this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()), 1, TimeUnit.MILLISECONDS);
+ this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer",
+ Thread.currentThread().isDaemon()), 1, TimeUnit.MILLISECONDS);
this.clientCnxs = Sets.newConcurrentHashSet();
this.topicStats = new ConcurrentHashMap<>();
- this.lookupRequestSemaphore = new AtomicReference<Semaphore>(
+ this.lookupRequestSemaphore = new AtomicReference<>(
new Semaphore(proxyConfig.getMaxConcurrentLookupRequests(), false));
if (proxyConfig.getProxyLogLevel().isPresent()) {
@@ -208,7 +209,8 @@ public class ProxyService implements Closeable {
// Bind and start to accept incoming connections.
if (proxyConfig.getServicePort().isPresent()) {
try {
- listenChannel = bootstrap.bind(proxyConfig.getBindAddress(), proxyConfig.getServicePort().get()).sync().channel();
+ listenChannel = bootstrap.bind(proxyConfig.getBindAddress(),
+ proxyConfig.getServicePort().get()).sync().channel();
LOG.info("Started Pulsar Proxy at {}", listenChannel.localAddress());
} catch (Exception e) {
throw new IOException("Failed to bind Pulsar Proxy on port " + proxyConfig.getServicePort().get(), e);
@@ -218,7 +220,8 @@ public class ProxyService implements Closeable {
if (proxyConfig.getServicePortTls().isPresent()) {
ServerBootstrap tlsBootstrap = bootstrap.clone();
tlsBootstrap.childHandler(new ServiceChannelInitializer(this, proxyConfig, true));
- listenChannelTls = tlsBootstrap.bind(proxyConfig.getBindAddress(), proxyConfig.getServicePortTls().get()).sync().channel();
+ listenChannelTls = tlsBootstrap.bind(proxyConfig.getBindAddress(),
+ proxyConfig.getServicePortTls().get()).sync().channel();
LOG.info("Started Pulsar TLS Proxy on {}", listenChannelTls.localAddress());
}
@@ -247,8 +250,8 @@ public class ProxyService implements Closeable {
}
// This call is used for starting additional protocol handlers
- public void startProxyExtensions(
- Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> protocolHandlers, ServerBootstrap serverBootstrap) {
+ public void startProxyExtensions(Map<String, Map<InetSocketAddress,
+ ChannelInitializer<SocketChannel>>> protocolHandlers, ServerBootstrap serverBootstrap) {
protocolHandlers.forEach((extensionName, initializers) -> {
initializers.forEach((address, initializer) -> {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 315607e..3d81317 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -25,16 +25,30 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
import static org.slf4j.bridge.SLF4JBridgeHandler.install;
import static org.slf4j.bridge.SLF4JBridgeHandler.removeHandlersForRootLogger;
-
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
import com.google.common.annotations.VisibleForTesting;
+import io.netty.util.internal.PlatformDependent;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Gauge.Child;
+import io.prometheus.client.exporter.MetricsServlet;
+import io.prometheus.client.hotspot.DefaultExports;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
import org.apache.logging.log4j.core.util.datetime.FixedDateFormat;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
-import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.configuration.VipStatus;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.util.CmdGenerateDocs;
+import org.apache.pulsar.proxy.stats.ProxyStats;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketPingPongServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
@@ -46,27 +60,9 @@ import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-
-import io.netty.util.internal.PlatformDependent;
-import io.prometheus.client.CollectorRegistry;
-import io.prometheus.client.Gauge;
-import io.prometheus.client.Gauge.Child;
-import io.prometheus.client.exporter.MetricsServlet;
-import io.prometheus.client.hotspot.DefaultExports;
-import org.apache.pulsar.common.configuration.VipStatus;
-import org.apache.pulsar.proxy.stats.ProxyStats;
-
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-
/**
- * Starts an instance of the Pulsar ProxyService
+ * Starts an instance of the Pulsar ProxyService.
*
*/
public class ProxyServiceStarter {
@@ -107,7 +103,8 @@ public class ProxyServiceStarter {
DateFormat dateFormat = new SimpleDateFormat(
FixedDateFormat.FixedFormat.ISO8601_OFFSET_DATE_TIME_HHMM.getPattern());
Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> {
- System.out.println(String.format("%s [%s] error Uncaught exception in thread %s: %s", dateFormat.format(new Date()), thread.getContextClassLoader(), thread.getName(), exception.getMessage()));
+ System.out.printf("%s [%s] error Uncaught exception in thread %s: %s%n", dateFormat.format(new Date()),
+ thread.getContextClassLoader(), thread.getName(), exception.getMessage());
exception.printStackTrace(System.out);
});
@@ -211,10 +208,10 @@ public class ProxyServiceStarter {
public void close() {
try {
- if(proxyService != null) {
+ if (proxyService != null) {
proxyService.close();
}
- if(server != null) {
+ if (server != null) {
server.stop();
}
} catch (Exception e) {
@@ -226,10 +223,12 @@ public class ProxyServiceStarter {
ProxyConfiguration config,
ProxyService service,
BrokerDiscoveryProvider discoveryProvider) throws Exception {
- server.addServlet("/metrics", new ServletHolder(MetricsServlet.class), Collections.emptyList(), config.isAuthenticateMetricsEndpoint());
+ server.addServlet("/metrics", new ServletHolder(MetricsServlet.class),
+ Collections.emptyList(), config.isAuthenticateMetricsEndpoint());
server.addRestResources("/", VipStatus.class.getPackage().getName(),
VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath());
- server.addRestResources("/proxy-stats", ProxyStats.class.getPackage().getName(), ProxyStats.ATTRIBUTE_PULSAR_PROXY_NAME, service);
+ server.addRestResources("/proxy-stats", ProxyStats.class.getPackage().getName(),
+ ProxyStats.ATTRIBUTE_PULSAR_PROXY_NAME, service);
AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, discoveryProvider);
ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index 658dd87..55f0b69 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -19,7 +19,10 @@
package org.apache.pulsar.proxy.server;
import static org.apache.commons.lang3.StringUtils.isEmpty;
-
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import java.util.function.Supplier;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
@@ -28,11 +31,6 @@ import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.OptionalProxyProtocolDecoder;
import org.apache.pulsar.common.util.NettyClientSslContextRefresher;
import org.apache.pulsar.common.util.NettyServerSslContextBuilder;
-
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.handler.ssl.SslContext;
import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
@@ -77,7 +75,8 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
} else {
serverSslCtxRefresher = new NettyServerSslContextBuilder(serviceConfig.isTlsAllowInsecureConnection(),
serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(),
- serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(),
+ serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(),
+ serviceConfig.getTlsProtocols(),
serviceConfig.isTlsRequireTrustedClientCertOnConnect(),
serviceConfig.getTlsCertRefreshCheckDurationSec());
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
index c92f722..687bfaf 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
@@ -31,9 +31,9 @@ import javax.servlet.DispatcherType;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.web.AuthenticationFilter;
+import org.apache.pulsar.broker.web.JettyRequestLogFactory;
import org.apache.pulsar.broker.web.JsonMapperProvider;
import org.apache.pulsar.broker.web.RateLimitingFilter;
-import org.apache.pulsar.broker.web.JettyRequestLogFactory;
import org.apache.pulsar.broker.web.WebExecutorThreadPool;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
@@ -76,7 +76,7 @@ public class WebServer {
private ServerConnector connector;
private ServerConnector connectorTls;
- public WebServer(ProxyConfiguration config, AuthenticationService authenticationService) throws IOException {
+ public WebServer(ProxyConfiguration config, AuthenticationService authenticationService) {
this.webServiceExecutor = new WebExecutorThreadPool(config.getHttpNumThreads(), "pulsar-external-web");
this.server = new Server(webServiceExecutor);
this.authenticationService = authenticationService;
@@ -84,12 +84,12 @@ public class WebServer {
List<ServerConnector> connectors = new ArrayList<>();
- HttpConfiguration http_config = new HttpConfiguration();
- http_config.setOutputBufferSize(config.getHttpOutputBufferSize());
+ HttpConfiguration httpConfig = new HttpConfiguration();
+ httpConfig.setOutputBufferSize(config.getHttpOutputBufferSize());
if (config.getWebServicePort().isPresent()) {
this.externalServicePort = config.getWebServicePort().get();
- connector = new ServerConnector(server, 1, 1, new HttpConnectionFactory(http_config));
+ connector = new ServerConnector(server, 1, 1, new HttpConnectionFactory(httpConfig));
connector.setHost(config.getBindAddress());
connector.setPort(externalServicePort);
connectors.add(connector);
@@ -146,7 +146,8 @@ public class WebServer {
addServlet(basePath, servletHolder, attributes, true);
}
- public void addServlet(String basePath, ServletHolder servletHolder, List<Pair<String, Object>> attributes, boolean requireAuthentication) {
+ public void addServlet(String basePath, ServletHolder servletHolder,
+ List<Pair<String, Object>> attributes, boolean requireAuthentication) {
Optional<String> existingPath = servletPaths.stream().filter(p -> p.startsWith(basePath)).findFirst();
if (existingPath.isPresent()) {
throw new IllegalArgumentException(
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/package-info.java
similarity index 69%
copy from pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java
copy to pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/package-info.java
index 844c7ca..07a8290 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/package-info.java
@@ -16,21 +16,4 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.proxy.extensions;
-
-import lombok.Data;
-import lombok.experimental.Accessors;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * The collection of Proxy Extensions.
- */
-@Data
-@Accessors(fluent = true)
-class ExtensionsDefinitions {
-
- private final Map<String, ProxyExtensionMetadata> extensions = new TreeMap<>();
-
-}
+package org.apache.pulsar.proxy.server;
\ No newline at end of file
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ProxyStats.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ProxyStats.java
index f709a25..d021389 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ProxyStats.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ProxyStats.java
@@ -18,28 +18,27 @@
*/
package org.apache.pulsar.proxy.stats;
+import io.netty.channel.Channel;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-
import javax.servlet.ServletContext;
import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
import javax.ws.rs.POST;
+import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response.Status;
-
import org.apache.pulsar.proxy.server.ProxyService;
-import io.netty.channel.Channel;
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
+
@Path("/")
@Api(value = "/proxy-stats", description = "Stats for proxy", tags = "proxy-stats", hidden = true)
@@ -55,7 +54,8 @@ public class ProxyStats {
@GET
@Path("/connections")
- @ApiOperation(value = "Proxy stats api to get info for live connections", response = List.class, responseContainer = "List")
+ @ApiOperation(value = "Proxy stats api to get info for live connections",
+ response = List.class, responseContainer = "List")
@ApiResponses(value = { @ApiResponse(code = 503, message = "Proxy service is not initialized") })
public List<ConnectionStats> metrics() {
List<ConnectionStats> stats = new ArrayList<>();
@@ -88,7 +88,8 @@ public class ProxyStats {
@POST
@Path("/logging/{logLevel}")
- @ApiOperation(hidden = true, value = "Change proxy logging level dynamically", notes = "It only changes log-level in memory, change it config file to persist the change")
+ @ApiOperation(hidden = true, value = "Change proxy logging level dynamically",
+ notes = "It only changes log-level in memory, change it config file to persist the change")
@ApiResponses(value = { @ApiResponse(code = 412, message = "Proxy log level can be [0-2]"), })
public void updateProxyLogLevel(@PathParam("logLevel") int logLevel) {
if (logLevel < 0 || logLevel > 2) {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/RestException.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/RestException.java
index 9769588..7ba8d99 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/RestException.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/RestException.java
@@ -20,11 +20,9 @@ package org.apache.pulsar.proxy.stats;
import java.io.PrintWriter;
import java.io.StringWriter;
-
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-
import org.apache.pulsar.common.policies.data.ErrorData;
/**
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/TopicStats.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/TopicStats.java
index 09b04fc..1c808a8 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/TopicStats.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/TopicStats.java
@@ -18,11 +18,9 @@
*/
package org.apache.pulsar.proxy.stats;
-import org.apache.pulsar.common.stats.Rate;
-
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-
import lombok.Getter;
+import org.apache.pulsar.common.stats.Rate;
@Getter
@JsonIgnoreProperties(value = { "msgInRate", "msgOutRate" })
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/package-info.java
similarity index 69%
copy from pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java
copy to pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/package-info.java
index 844c7ca..2eb5cf8 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/package-info.java
@@ -16,21 +16,4 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.proxy.extensions;
-
-import lombok.Data;
-import lombok.experimental.Accessors;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * The collection of Proxy Extensions.
- */
-@Data
-@Accessors(fluent = true)
-class ExtensionsDefinitions {
-
- private final Map<String, ProxyExtensionMetadata> extensions = new TreeMap<>();
-
-}
+package org.apache.pulsar.proxy.stats;
\ No newline at end of file
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/util/package-info.java
similarity index 69%
copy from pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java
copy to pulsar-proxy/src/main/java/org/apache/pulsar/proxy/util/package-info.java
index 844c7ca..eddea63 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/util/package-info.java
@@ -16,21 +16,4 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.proxy.extensions;
-
-import lombok.Data;
-import lombok.experimental.Accessors;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * The collection of Proxy Extensions.
- */
-@Data
-@Accessors(fluent = true)
-class ExtensionsDefinitions {
-
- private final Map<String, ProxyExtensionMetadata> extensions = new TreeMap<>();
-
-}
+package org.apache.pulsar.proxy.util;
\ No newline at end of file
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index 062db18..098d892 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -91,7 +91,7 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest {
.operationTimeout(1000, TimeUnit.MILLISECONDS)
.build();
- Assert.assertEquals(ProxyService.rejectedConnections.get(), 0.0d);
+ Assert.assertEquals(ProxyService.REJECTED_CONNECTIONS.get(), 0.0d);
try {
@Cleanup
Producer<byte[]> producer2 = client2.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic-1").create();
@@ -101,7 +101,7 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest {
// OK
}
// should add retry count since retry every 100ms and operation timeout is set to 1000ms
- Assert.assertEquals(ProxyService.rejectedConnections.get(), 5.0d);
+ Assert.assertEquals(ProxyService.REJECTED_CONNECTIONS.get(), 5.0d);
}
private static final Logger LOG = LoggerFactory.getLogger(ProxyConnectionThrottlingTest.class);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index fa3c485..25df835 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -99,7 +99,7 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest {
} catch (Exception ex) {
// Ignore
}
- Assert.assertEquals(LookupProxyHandler.rejectedPartitionsMetadataRequests.get(), 5.0d);
+ Assert.assertEquals(LookupProxyHandler.REJECTED_PARTITIONS_METADATA_REQUESTS.get(), 5.0d);
proxyService.getLookupRequestSemaphore().release();
try {
@Cleanup
@@ -109,6 +109,6 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest {
Assert.fail("Should not have failed since can acquire LookupRequestSemaphore");
}
- Assert.assertEquals(LookupProxyHandler.rejectedPartitionsMetadataRequests.get(), 5.0d);
+ Assert.assertEquals(LookupProxyHandler.REJECTED_PARTITIONS_METADATA_REQUESTS.get(), 5.0d);
}
}