You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/12/16 00:40:49 UTC

[pulsar] branch master updated: Enable CheckStyle Plugin in Pulsar proxy (#13343)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cba8800  Enable CheckStyle Plugin in Pulsar proxy (#13343)
cba8800 is described below

commit cba8800de1013d0e8ac81f43ecb040a55978c358
Author: ZhangJian He <sh...@gmail.com>
AuthorDate: Thu Dec 16 08:39:28 2021 +0800

    Enable CheckStyle Plugin in Pulsar proxy (#13343)
---
 .../src/main/resources/pulsar/suppressions.xml     |  2 +
 pulsar-proxy/pom.xml                               | 13 +++
 .../proxy/extensions/ExtensionsDefinitions.java    |  5 +-
 .../pulsar/proxy/extensions/ProxyExtension.java    |  5 +-
 .../proxy/extensions/ProxyExtensionMetadata.java   |  3 +-
 .../extensions/ProxyExtensionWithClassLoader.java  |  7 +-
 .../pulsar/proxy/extensions/ProxyExtensions.java   | 13 ++-
 .../proxy/extensions/ProxyExtensionsUtils.java     | 14 ++--
 ...xtensionsDefinitions.java => package-info.java} | 19 +----
 .../pulsar/proxy/server/AdminProxyHandler.java     | 21 +++--
 .../proxy/server/BrokerDiscoveryProvider.java      |  8 +-
 .../pulsar/proxy/server/DirectProxyHandler.java    | 67 +++++++--------
 .../pulsar/proxy/server/LookupProxyHandler.java    | 97 ++++++++++------------
 .../pulsar/proxy/server/ParserProxyHandler.java    | 86 ++++++++++---------
 .../apache/pulsar/proxy/server/ProxyClientCnx.java |  5 +-
 .../pulsar/proxy/server/ProxyConfiguration.java    | 30 +++----
 .../pulsar/proxy/server/ProxyConnection.java       | 44 +++++-----
 .../pulsar/proxy/server/ProxyConnectionPool.java   |  4 +-
 .../apache/pulsar/proxy/server/ProxyService.java   | 29 ++++---
 .../pulsar/proxy/server/ProxyServiceStarter.java   | 51 ++++++------
 .../proxy/server/ServiceChannelInitializer.java    | 13 ++-
 .../org/apache/pulsar/proxy/server/WebServer.java  | 13 +--
 .../package-info.java}                             | 19 +----
 .../org/apache/pulsar/proxy/stats/ProxyStats.java  | 23 ++---
 .../apache/pulsar/proxy/stats/RestException.java   |  2 -
 .../org/apache/pulsar/proxy/stats/TopicStats.java  |  4 +-
 .../package-info.java}                             | 19 +----
 .../package-info.java}                             | 19 +----
 .../server/ProxyConnectionThrottlingTest.java      |  4 +-
 .../proxy/server/ProxyLookupThrottlingTest.java    |  4 +-
 30 files changed, 292 insertions(+), 351 deletions(-)

diff --git a/buildtools/src/main/resources/pulsar/suppressions.xml b/buildtools/src/main/resources/pulsar/suppressions.xml
index 0b120bb..ef9658c 100644
--- a/buildtools/src/main/resources/pulsar/suppressions.xml
+++ b/buildtools/src/main/resources/pulsar/suppressions.xml
@@ -39,6 +39,8 @@
     <!-- suppress most all checks expect below-->
     <suppress checks="^(?!.*UnusedImports).*$" files=".*[\\/]src[\\/]test[\\/].*"/>
 
+    <suppress checks="IllegalImport" files="ProxyServiceStarter.java"/>
+
     <!-- suppress all checks in the copied code -->
     <suppress checks=".*" files=".+[\\/]com[\\/]scurrilous[\\/]circe[\\/].+\.java"/>
 
diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml
index 0c8e649..529cc77 100644
--- a/pulsar-proxy/pom.xml
+++ b/pulsar-proxy/pom.xml
@@ -198,6 +198,19 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>checkstyle</id>
+            <phase>verify</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
   <profiles>
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java
index 844c7ca..2cdbad6 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java
@@ -18,11 +18,10 @@
  */
 package org.apache.pulsar.proxy.extensions;
 
-import lombok.Data;
-import lombok.experimental.Accessors;
-
 import java.util.Map;
 import java.util.TreeMap;
+import lombok.Data;
+import lombok.experimental.Accessors;
 
 /**
  * The collection of Proxy Extensions.
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtension.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtension.java
index b973e10..928a4b4 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtension.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtension.java
@@ -20,14 +20,13 @@ package org.apache.pulsar.proxy.extensions;
 
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
+import java.net.InetSocketAddress;
+import java.util.Map;
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
 import org.apache.pulsar.proxy.server.ProxyConfiguration;
 import org.apache.pulsar.proxy.server.ProxyService;
 
-import java.net.InetSocketAddress;
-import java.util.Map;
-
 /**
  * The extension interface for support additional extensions on Pulsar Proxy.
  */
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionMetadata.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionMetadata.java
index 632c841..935a393 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionMetadata.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionMetadata.java
@@ -18,11 +18,10 @@
  */
 package org.apache.pulsar.proxy.extensions;
 
+import java.nio.file.Path;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
-import java.nio.file.Path;
-
 /**
  * The metadata of Proxy Extension.
  */
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoader.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoader.java
index 1f6924a..922c339 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoader.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoader.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.proxy.extensions;
 
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
 import lombok.Data;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -27,10 +30,6 @@ import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.proxy.server.ProxyConfiguration;
 import org.apache.pulsar.proxy.server.ProxyService;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Map;
-
 /**
  * A extension with its classloader.
  */
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensions.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensions.java
index ba3c383..14dae3a 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensions.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensions.java
@@ -21,22 +21,19 @@ package org.apache.pulsar.proxy.extensions;
 import com.google.common.collect.ImmutableMap;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
-
+import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.HashMap;
 import java.util.HashSet;
-
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.proxy.server.ProxyConfiguration;
 import org.apache.pulsar.proxy.server.ProxyService;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
  * A collection of loaded extensions.
  */
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionsUtils.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionsUtils.java
index 2f02827..f8a532d 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionsUtils.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionsUtils.java
@@ -18,12 +18,7 @@
  */
 package org.apache.pulsar.proxy.extensions;
 
-import lombok.experimental.UtilityClass;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.common.nar.NarClassLoader;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
-
+import static com.google.common.base.Preconditions.checkArgument;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.DirectoryStream;
@@ -31,8 +26,11 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Collections;
-
-import static com.google.common.base.Preconditions.checkArgument;
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 
 /**
  * Util class to search and load {@link ProxyExtension}s.
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/package-info.java
similarity index 69%
copy from pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java
copy to pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/package-info.java
index 844c7ca..7ef1689 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/package-info.java
@@ -16,21 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.proxy.extensions;
-
-import lombok.Data;
-import lombok.experimental.Accessors;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * The collection of Proxy Extensions.
- */
-@Data
-@Accessors(fluent = true)
-class ExtensionsDefinitions {
-
-    private final Map<String, ProxyExtensionMetadata> extensions = new TreeMap<>();
-
-}
+package org.apache.pulsar.proxy.extensions;
\ No newline at end of file
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
index 853eb0b..a38e2f3 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.proxy.server;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
-
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -33,13 +32,11 @@ import java.util.Iterator;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.Executor;
-
 import javax.net.ssl.SSLContext;
 import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-
 import org.apache.pulsar.broker.web.AuthenticationFilter;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
@@ -115,14 +112,16 @@ class AdminProxyHandler extends ProxyServlet {
         String value = config.getInitParameter("maxThreads");
         if (value == null || "-".equals(value)) {
             executor = (Executor) getServletContext().getAttribute("org.eclipse.jetty.server.Executor");
-            if (executor == null)
+            if (executor == null) {
                 throw new IllegalStateException("No server executor for proxy");
+            }
         } else {
             QueuedThreadPool qtp = new QueuedThreadPool(Integer.parseInt(value));
             String servletName = config.getServletName();
             int dot = servletName.lastIndexOf('.');
-            if (dot >= 0)
+            if (dot >= 0) {
                 servletName = servletName.substring(dot + 1);
+            }
             qtp.setName(servletName);
             executor = qtp;
         }
@@ -130,22 +129,26 @@ class AdminProxyHandler extends ProxyServlet {
         client.setExecutor(executor);
 
         value = config.getInitParameter("maxConnections");
-        if (value == null)
+        if (value == null) {
             value = "256";
+        }
         client.setMaxConnectionsPerDestination(Integer.parseInt(value));
 
         value = config.getInitParameter("idleTimeout");
-        if (value == null)
+        if (value == null) {
             value = "30000";
+        }
         client.setIdleTimeout(Long.parseLong(value));
 
         value = config.getInitParameter("requestBufferSize");
-        if (value != null)
+        if (value != null) {
             client.setRequestBufferSize(Integer.parseInt(value));
+        }
 
         value = config.getInitParameter("responseBufferSize");
-        if (value != null)
+        if (value != null){
             client.setResponseBufferSize(Integer.parseInt(value));
+        }
 
         try {
             client.start();
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
index bb90be2..ae81350 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.proxy.server;
 
 import static org.apache.bookkeeper.common.util.MathUtils.signSafeMod;
-
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
@@ -27,7 +27,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.pulsar.broker.PulsarServerException;
@@ -43,8 +42,6 @@ import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.util.concurrent.DefaultThreadFactory;
-
 /**
  * Maintains available active broker list and returns next active broker in round-robin for discovery service.
  * This is an API used by Proxy Extensions.
@@ -151,7 +148,8 @@ public class BrokerDiscoveryProvider implements Closeable {
                 throw new IllegalAccessException(String.format("Failed to get property %s admin data due to %s",
                         topicName.getTenant(), e.getMessage()));
             }
-            if (!service.getAuthorizationService().isTenantAdmin(topicName.getTenant(), role, tenantInfo, authenticationData).get()) {
+            if (!service.getAuthorizationService()
+                    .isTenantAdmin(topicName.getTenant(), role, tenantInfo, authenticationData).get()) {
                 throw new IllegalAccessException("Don't have permission to administrate resources on this tenant");
             }
         }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index c896be5..ead243a 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.proxy.server;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
-
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -42,7 +41,6 @@ import io.netty.handler.ssl.SslHandler;
 import io.netty.util.CharsetUtil;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
-
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -50,16 +48,12 @@ import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Supplier;
-
 import javax.net.ssl.SSLSession;
-
 import lombok.Getter;
-
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.common.tls.TlsHostnameVerifier;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
@@ -67,6 +61,7 @@ import org.apache.pulsar.common.api.proto.CommandConnected;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.PulsarDecoder;
 import org.apache.pulsar.common.stats.Rate;
+import org.apache.pulsar.common.tls.TlsHostnameVerifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -152,25 +147,29 @@ public class DirectProxyHandler {
                 inboundOutboundChannelMap.put(outboundChannel.id() , inboundChannel.id());
             }
 
-            if (config.isHaProxyProtocolEnabled()) {
-                if (proxyConnection.hasHAProxyMessage()) {
-                    outboundChannel.writeAndFlush(encodeProxyProtocolMessage(proxyConnection.getHAProxyMessage()));
-                } else {
-                    if (inboundChannel.remoteAddress() instanceof InetSocketAddress) {
-                        InetSocketAddress clientAddress = (InetSocketAddress) inboundChannel.remoteAddress();
-                        String sourceAddress = clientAddress.getAddress().getHostAddress();
-                        int sourcePort = clientAddress.getPort();
-                        if (outboundChannel.localAddress() instanceof InetSocketAddress) {
-                            InetSocketAddress proxyAddress = (InetSocketAddress) inboundChannel.remoteAddress();
-                            String destinationAddress = proxyAddress.getAddress().getHostAddress();
-                            int destinationPort = proxyAddress.getPort();
-                            HAProxyMessage msg = new HAProxyMessage(HAProxyProtocolVersion.V1, HAProxyCommand.PROXY,
-                                    HAProxyProxiedProtocol.TCP4, sourceAddress, destinationAddress, sourcePort, destinationPort);
-                            outboundChannel.writeAndFlush(encodeProxyProtocolMessage(msg));
-                            msg.release();
-                        }
-                    }
+            if (!config.isHaProxyProtocolEnabled()) {
+                return;
+            }
+
+            if (proxyConnection.hasHAProxyMessage()) {
+                outboundChannel.writeAndFlush(encodeProxyProtocolMessage(proxyConnection.getHAProxyMessage()));
+            } else {
+                if (!(inboundChannel.remoteAddress() instanceof InetSocketAddress)) {
+                    return;
+                }
+                if (!(outboundChannel.localAddress() instanceof InetSocketAddress)) {
+                    return;
                 }
+                InetSocketAddress clientAddress = (InetSocketAddress) inboundChannel.remoteAddress();
+                String sourceAddress = clientAddress.getAddress().getHostAddress();
+                int sourcePort = clientAddress.getPort();
+                InetSocketAddress proxyAddress = (InetSocketAddress) inboundChannel.remoteAddress();
+                String destinationAddress = proxyAddress.getAddress().getHostAddress();
+                int destinationPort = proxyAddress.getPort();
+                HAProxyMessage msg = new HAProxyMessage(HAProxyProtocolVersion.V1, HAProxyCommand.PROXY,
+                        HAProxyProxiedProtocol.TCP4, sourceAddress, destinationAddress, sourcePort, destinationPort);
+                outboundChannel.writeAndFlush(encodeProxyProtocolMessage(msg));
+                msg.release();
             }
         });
     }
@@ -246,9 +245,9 @@ public class DirectProxyHandler {
                 break;
 
             case HandshakeCompleted:
-                ProxyService.opsCounter.inc();
+                ProxyService.OPS_COUNTER.inc();
                 if (msg instanceof ByteBuf) {
-                    ProxyService.bytesCounter.inc(((ByteBuf) msg).readableBytes());
+                    ProxyService.BYTES_COUNTER.inc(((ByteBuf) msg).readableBytes());
                 }
                 inboundChannel.writeAndFlush(msg).addListener(this);
                 break;
@@ -352,14 +351,16 @@ public class DirectProxyHandler {
                     // Enable parsing feature, proxyLogLevel(1 or 2)
                     // Add parser handler
                     if (connected.hasMaxMessageSize()) {
-                        inboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder",
-                                                          new LengthFieldBasedFrameDecoder(connected.getMaxMessageSize()
-                                                                                           + Commands.MESSAGE_SIZE_FRAME_PADDING,
-                                                                                           0, 4, 0, 4));
+                        inboundChannel.pipeline()
+                                .replace("frameDecoder", "newFrameDecoder",
+                                        new LengthFieldBasedFrameDecoder(connected.getMaxMessageSize()
+                                                + Commands.MESSAGE_SIZE_FRAME_PADDING,
+                                                0, 4, 0, 4));
                         outboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder",
-                                                           new LengthFieldBasedFrameDecoder(
-                                                               connected.getMaxMessageSize()
-                                                               + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
+                                new LengthFieldBasedFrameDecoder(
+                                        connected.getMaxMessageSize()
+                                                + Commands.MESSAGE_SIZE_FRAME_PADDING,
+                                        0, 4, 0, 4));
 
                         inboundChannel.pipeline().addBefore("handler", "inboundParser",
                                                             new ParserProxyHandler(service, inboundChannel,
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index b14bea5..6319824 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -19,67 +19,64 @@
 package org.apache.pulsar.proxy.server;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
-
+import io.netty.buffer.ByteBuf;
+import io.prometheus.client.Counter;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Optional;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.CommandGetSchema;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.CommandLookupTopic;
 import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType;
 import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.buffer.ByteBuf;
-import io.prometheus.client.Counter;
-
 public class LookupProxyHandler {
     private final String throttlingErrorMessage = "Too many concurrent lookup and partitionsMetadata requests";
     private final ProxyService service;
     private final ProxyConnection proxyConnection;
     private final boolean connectWithTLS;
 
-    private SocketAddress clientAddress;
-    private String brokerServiceURL;
+    private final SocketAddress clientAddress;
+    private final String brokerServiceURL;
 
-    private static final Counter lookupRequests = Counter
+    private static final Counter LOOKUP_REQUESTS = Counter
             .build("pulsar_proxy_lookup_requests", "Counter of topic lookup requests").create().register();
 
-    private static final Counter partitionsMetadataRequests = Counter
+    private static final Counter PARTITIONS_METADATA_REQUESTS = Counter
             .build("pulsar_proxy_partitions_metadata_requests", "Counter of partitions metadata requests").create()
             .register();
 
-    private static final Counter getTopicsOfNamespaceRequestss = Counter
+    private static final Counter GET_TOPICS_OF_NAMESPACE_REQUESTS = Counter
             .build("pulsar_proxy_get_topics_of_namespace_requests", "Counter of getTopicsOfNamespace requests")
             .create()
             .register();
 
-    private static final Counter getSchemaRequests = Counter
+    private static final Counter GET_SCHEMA_REQUESTS = Counter
             .build("pulsar_proxy_get_schema_requests", "Counter of schema requests")
             .create()
             .register();
 
-    static final Counter rejectedLookupRequests = Counter.build("pulsar_proxy_rejected_lookup_requests",
+    static final Counter REJECTED_LOOKUP_REQUESTS = Counter.build("pulsar_proxy_rejected_lookup_requests",
             "Counter of topic lookup requests rejected due to throttling").create().register();
 
-    static final Counter rejectedPartitionsMetadataRequests = Counter
+    static final Counter REJECTED_PARTITIONS_METADATA_REQUESTS = Counter
             .build("pulsar_proxy_rejected_partitions_metadata_requests",
                     "Counter of partitions metadata requests rejected due to throttling")
             .create().register();
 
-    static final Counter rejectedGetTopicsOfNamespaceRequests = Counter
+    static final Counter REJECTED_GET_TOPICS_OF_NAMESPACE_REQUESTS = Counter
             .build("pulsar_proxy_rejected_get_topics_of_namespace_requests",
                     "Counter of getTopicsOfNamespace requests rejected due to throttling")
             .create().register();
@@ -99,7 +96,7 @@ public class LookupProxyHandler {
         }
         long clientRequestId = lookup.getRequestId();
         if (this.service.getLookupRequestSemaphore().tryAcquire()) {
-            lookupRequests.inc();
+            LOOKUP_REQUESTS.inc();
             String topic = lookup.getTopic();
             String serviceUrl;
             if (isBlank(brokerServiceURL)) {
@@ -121,7 +118,7 @@ public class LookupProxyHandler {
             performLookup(clientRequestId, topic, serviceUrl, false, 10);
             this.service.getLookupRequestSemaphore().release();
         } else {
-            rejectedLookupRequests.inc();
+            REJECTED_LOOKUP_REQUESTS.inc();
             if (log.isDebugEnabled()) {
                 log.debug("Lookup Request ID {} from {} rejected - {}.", clientRequestId, clientAddress,
                         throttlingErrorMessage);
@@ -179,9 +176,9 @@ public class LookupProxyHandler {
                         // to use the appropriate target broker (and port) when it
                         // will connect back.
                         if (log.isDebugEnabled()) {
-                            log.debug(
-                                "Successfully perform lookup '{}' for topic '{}' with clientReq Id '{}' and lookup-broker {}",
-                                addr, topic, clientRequestId, brokerUrl);
+                            log.debug("Successfully perform lookup '{}' for topic '{}'"
+                                            + " with clientReq Id '{}' and lookup-broker {}",
+                                    addr, topic, clientRequestId, brokerUrl);
                         }
                         proxyConnection.ctx().writeAndFlush(Commands.newLookupResponse(brokerUrl, brokerUrl, true,
                             LookupType.Connect, clientRequestId, true /* this is coming from proxy */));
@@ -198,7 +195,7 @@ public class LookupProxyHandler {
     }
 
     public void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata) {
-        partitionsMetadataRequests.inc();
+        PARTITIONS_METADATA_REQUESTS.inc();
         if (log.isDebugEnabled()) {
             log.debug("[{}] Received PartitionMetadataLookup", clientAddress);
         }
@@ -207,7 +204,7 @@ public class LookupProxyHandler {
             handlePartitionMetadataResponse(partitionMetadata, clientRequestId);
             this.service.getLookupRequestSemaphore().release();
         } else {
-            rejectedPartitionsMetadataRequests.inc();
+            REJECTED_PARTITIONS_METADATA_REQUESTS.inc();
             if (log.isDebugEnabled()) {
                 log.debug("PartitionMetaData Request ID {} from {} rejected - {}.", clientRequestId, clientAddress,
                         throttlingErrorMessage);
@@ -270,7 +267,7 @@ public class LookupProxyHandler {
     }
 
     public void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
-        getTopicsOfNamespaceRequestss.inc();
+        GET_TOPICS_OF_NAMESPACE_REQUESTS.inc();
         if (log.isDebugEnabled()) {
             log.debug("[{}] Received GetTopicsOfNamespace", clientAddress);
         }
@@ -281,7 +278,7 @@ public class LookupProxyHandler {
             handleGetTopicsOfNamespace(commandGetTopicsOfNamespace, requestId);
             this.service.getLookupRequestSemaphore().release();
         } else {
-            rejectedGetTopicsOfNamespaceRequests.inc();
+            REJECTED_GET_TOPICS_OF_NAMESPACE_REQUESTS.inc();
             if (log.isDebugEnabled()) {
                 log.debug("GetTopicsOfNamespace Request ID {} from {} rejected - {}.", requestId, clientAddress,
                     throttlingErrorMessage);
@@ -296,11 +293,11 @@ public class LookupProxyHandler {
                                             long clientRequestId) {
         String serviceUrl = getBrokerServiceUrl(clientRequestId);
 
-        if(!StringUtils.isNotBlank(serviceUrl)) {
+        if (!StringUtils.isNotBlank(serviceUrl)) {
             return;
         }
-        performGetTopicsOfNamespace(clientRequestId, commandGetTopicsOfNamespace.getNamespace(), serviceUrl, 10,
-            commandGetTopicsOfNamespace.getMode());
+        performGetTopicsOfNamespace(clientRequestId, commandGetTopicsOfNamespace.getNamespace(), serviceUrl,
+                10, commandGetTopicsOfNamespace.getMode());
     }
 
     private void performGetTopicsOfNamespace(long clientRequestId,
@@ -316,7 +313,7 @@ public class LookupProxyHandler {
 
         InetSocketAddress addr = getAddr(brokerServiceUrl, clientRequestId);
 
-        if(addr == null){
+        if (addr == null) {
             return;
         }
 
@@ -331,7 +328,8 @@ public class LookupProxyHandler {
             command = Commands.newGetTopicsOfNamespaceRequest(namespaceName, requestId, mode);
             clientCnx.newGetTopicsOfNamespace(command, requestId).whenComplete((r, t) -> {
                 if (t != null) {
-                    log.warn("[{}] Failed to get TopicsOfNamespace {}: {}", clientAddress, namespaceName, t.getMessage());
+                    log.warn("[{}] Failed to get TopicsOfNamespace {}: {}",
+                            clientAddress, namespaceName, t.getMessage());
                     proxyConnection.ctx().writeAndFlush(
                         Commands.newError(clientRequestId, ServerError.ServiceNotReady, t.getMessage()));
                 } else {
@@ -350,7 +348,7 @@ public class LookupProxyHandler {
     }
 
     public void handleGetSchema(CommandGetSchema commandGetSchema) {
-        getSchemaRequests.inc();
+        GET_SCHEMA_REQUESTS.inc();
         if (log.isDebugEnabled()) {
             log.debug("[{}] Received GetSchema {}", clientAddress, commandGetSchema);
         }
@@ -365,12 +363,12 @@ public class LookupProxyHandler {
             schemaVersion = Optional.empty();
         }
 
-        if(!StringUtils.isNotBlank(serviceUrl)) {
+        if (!StringUtils.isNotBlank(serviceUrl)) {
             return;
         }
         InetSocketAddress addr = getAddr(serviceUrl, clientRequestId);
 
-        if(addr == null){
+        if (addr == null) {
             return;
         }
         if (log.isDebugEnabled()) {
@@ -405,27 +403,24 @@ public class LookupProxyHandler {
     }
 
     /**
-     *  Get default broker service url or discovery an available broker
+     *  Get default broker service url or discovery an available broker.
      **/
     private String getBrokerServiceUrl(long clientRequestId) {
-        if (isBlank(brokerServiceURL)) {
-            ServiceLookupData availableBroker;
-            try {
-                availableBroker = service.getDiscoveryProvider().nextBroker();
-            } catch (Exception e) {
-                log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e);
-                proxyConnection.ctx().writeAndFlush(Commands.newError(
-                        clientRequestId, ServerError.ServiceNotReady, e.getMessage()
-                ));
-                return null;
-            }
-            return this.connectWithTLS ?
-                    availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl();
-        } else {
-            return this.connectWithTLS ?
-                    service.getConfiguration().getBrokerServiceURLTLS() : service.getConfiguration().getBrokerServiceURL();
+        if (StringUtils.isNotBlank(brokerServiceURL)) {
+            return this.connectWithTLS ? service.getConfiguration().getBrokerServiceURLTLS()
+                    : service.getConfiguration().getBrokerServiceURL();
         }
-
+        ServiceLookupData availableBroker;
+        try {
+            availableBroker = service.getDiscoveryProvider().nextBroker();
+        } catch (Exception e) {
+            log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e);
+            proxyConnection.ctx().writeAndFlush(Commands.newError(
+                    clientRequestId, ServerError.ServiceNotReady, e.getMessage()
+            ));
+            return null;
+        }
+        return this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl();
     }
 
     private InetSocketAddress getAddr(String brokerServiceUrl, long clientRequestId) {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
index f152b32..40c05a3 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
@@ -19,12 +19,18 @@
 
 package org.apache.pulsar.proxy.server;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.raw.MessageParser;
@@ -34,34 +40,31 @@ import org.apache.pulsar.proxy.stats.TopicStats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufUtil;
-import io.netty.buffer.CompositeByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-
 
 public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
 
 
-    private Channel channel;
+    private final Channel channel;
     //inbound
     protected static final String FRONTEND_CONN = "frontendconn";
     //outbound
     protected static final String BACKEND_CONN = "backendconn";
 
-    private String connType;
+    private final String connType;
 
-    private int maxMessageSize;
+    private final int maxMessageSize;
     private final ProxyService service;
 
 
-    //producerid+channelid as key
-    //or consumerid+channelid as key
-    private static Map<String, String> producerHashMap = new ConcurrentHashMap<>();
-    private static Map<String, String> consumerHashMap = new ConcurrentHashMap<>();
+    /**
+     * producerid + channelid as key.
+     */
+    private static final Map<String, String> producerHashMap = new ConcurrentHashMap<>();
+
+    /**
+     * consumerid + channelid as key.
+     */
+    private static final Map<String, String> consumerHashMap = new ConcurrentHashMap<>();
 
     public ParserProxyHandler(ProxyService service, Channel channel, String type, int maxMessageSize) {
         this.service = service;
@@ -70,23 +73,26 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
         this.maxMessageSize = maxMessageSize;
     }
 
-    private void logging(Channel conn, BaseCommand.Type cmdtype, String info, List<RawMessage> messages) throws Exception{
+    private void logging(Channel conn, BaseCommand.Type cmdtype, String info, List<RawMessage> messages) {
 
         if (messages != null) {
             // lag
             StringBuilder infoBuilder = new StringBuilder(info);
             for (RawMessage message : messages) {
-                infoBuilder.append("[").append(System.currentTimeMillis() - message.getPublishTime()).append("] ").append(new String(ByteBufUtil.getBytes(message.getData()), StandardCharsets.UTF_8));
+                infoBuilder.append("[").append(System.currentTimeMillis() - message.getPublishTime()).append("] ")
+                        .append(new String(ByteBufUtil.getBytes(message.getData()), StandardCharsets.UTF_8));
             }
             info = infoBuilder.toString();
         }
         // log conn format is like from source to target
         switch (this.connType) {
             case ParserProxyHandler.FRONTEND_CONN:
-                log.info(ParserProxyHandler.FRONTEND_CONN + ":{} cmd:{} msg:{}", "[" + conn.remoteAddress() + conn.localAddress() + "]", cmdtype, info);
+                log.info(ParserProxyHandler.FRONTEND_CONN + ":{} cmd:{} msg:{}", "[" + conn.remoteAddress()
+                        + conn.localAddress() + "]", cmdtype, info);
                 break;
             case ParserProxyHandler.BACKEND_CONN:
-                log.info(ParserProxyHandler.BACKEND_CONN + ":{} cmd:{} msg:{}", "[" + conn.localAddress() + conn.remoteAddress() + "]", cmdtype, info);
+                log.info(ParserProxyHandler.BACKEND_CONN + ":{} cmd:{} msg:{}", "[" + conn.localAddress()
+                        + conn.remoteAddress() + "]", cmdtype, info);
                 break;
         }
     }
@@ -94,9 +100,9 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
     private final BaseCommand cmd = new BaseCommand();
 
     public void channelRead(ChannelHandlerContext ctx, Object msg) {
-        TopicName topicName ;
+        TopicName topicName;
         List<RawMessage> messages = new ArrayList<>();
-        ByteBuf buffer = (ByteBuf)(msg);
+        ByteBuf buffer = (ByteBuf) (msg);
 
         try {
             buffer.markReaderIndex();
@@ -107,9 +113,11 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
 
             switch (cmd.getType()) {
                 case PRODUCER:
-                    ParserProxyHandler.producerHashMap.put(String.valueOf(cmd.getProducer().getProducerId()) + "," + String.valueOf(ctx.channel().id()), cmd.getProducer().getTopic());
+                    ParserProxyHandler.producerHashMap.put(cmd.getProducer().getProducerId() + "," + ctx.channel().id(),
+                            cmd.getProducer().getTopic());
 
-                    logging(ctx.channel() , cmd.getType() , "{producer:" + cmd.getProducer().getProducerName() + ",topic:" + cmd.getProducer().getTopic() + "}", null);
+                    logging(ctx.channel() , cmd.getType() , "{producer:" + cmd.getProducer().getProducerName()
+                            + ",topic:" + cmd.getProducer().getTopic() + "}", null);
                     break;
 
                 case SEND:
@@ -117,10 +125,11 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
                         logging(ctx.channel() , cmd.getType() , "", null);
                         break;
                     }
-                    topicName = TopicName.get(ParserProxyHandler.producerHashMap.get(String.valueOf(cmd.getSend().getProducerId()) + "," + String.valueOf(ctx.channel().id())));
+                    topicName = TopicName.get(ParserProxyHandler.producerHashMap.get(cmd.getSend().getProducerId() + ","
+                            + ctx.channel().id()));
                     MutableLong msgBytes = new MutableLong(0);
-                    MessageParser.parseMessage(topicName,  -1L,
-                            -1L,buffer,(message) -> {
+                    MessageParser.parseMessage(topicName, -1L,
+                            -1L, buffer, (message) -> {
                                 messages.add(message);
                                 msgBytes.add(message.getData().readableBytes());
                             }, maxMessageSize);
@@ -132,9 +141,11 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
                     break;
 
                 case SUBSCRIBE:
-                    ParserProxyHandler.consumerHashMap.put(String.valueOf(cmd.getSubscribe().getConsumerId()) + "," + String.valueOf(ctx.channel().id()) , cmd.getSubscribe().getTopic());
+                    ParserProxyHandler.consumerHashMap.put(cmd.getSubscribe().getConsumerId() + ","
+                                    + ctx.channel().id(), cmd.getSubscribe().getTopic());
 
-                    logging(ctx.channel() , cmd.getType() , "{consumer:" + cmd.getSubscribe().getConsumerName() + ",topic:" + cmd.getSubscribe().getTopic() + "}" , null);
+                    logging(ctx.channel() , cmd.getType() , "{consumer:" + cmd.getSubscribe().getConsumerName()
+                            + ",topic:" + cmd.getSubscribe().getTopic() + "}" , null);
                     break;
 
                 case MESSAGE:
@@ -142,13 +153,14 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
                         logging(ctx.channel() , cmd.getType() , "" , null);
                         break;
                     }
-                    topicName = TopicName.get(ParserProxyHandler.consumerHashMap.get(String.valueOf(cmd.getMessage().getConsumerId()) + "," + DirectProxyHandler.inboundOutboundChannelMap.get(ctx.channel().id())));
+                    topicName = TopicName.get(ParserProxyHandler.consumerHashMap.get(cmd.getMessage().getConsumerId()
+                            + "," + DirectProxyHandler.inboundOutboundChannelMap.get(ctx.channel().id())));
                     msgBytes = new MutableLong(0);
-                    MessageParser.parseMessage(topicName,  -1L,
-                                -1L,buffer,(message) -> {
-                                    messages.add(message);
-                                    msgBytes.add(message.getData().readableBytes());
-                                }, maxMessageSize);
+                    MessageParser.parseMessage(topicName, -1L,
+                            -1L, buffer, (message) -> {
+                                messages.add(message);
+                                msgBytes.add(message.getData().readableBytes());
+                            }, maxMessageSize);
                     // update topic stats
                     topicStats = this.service.getTopicStats().computeIfAbsent(topicName.toString(),
                             topic -> new TopicStats());
@@ -172,8 +184,8 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
             ByteBuf totalSizeBuf = Unpooled.buffer(4);
             totalSizeBuf.writeInt(buffer.readableBytes());
             CompositeByteBuf compBuf = Unpooled.compositeBuffer();
-            compBuf.addComponents(totalSizeBuf,buffer);
-            compBuf.writerIndex(totalSizeBuf.capacity()+buffer.capacity());
+            compBuf.addComponents(totalSizeBuf, buffer);
+            compBuf.writerIndex(totalSizeBuf.capacity() + buffer.capacity());
 
             // Release mssages
             messages.forEach(RawMessage::release);
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
index 665b9f8..50a77d3 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.proxy.server;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
-
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -48,8 +47,8 @@ public class ProxyClientCnx extends ClientCnx {
     @Override
     protected ByteBuf newConnectCommand() throws Exception {
         if (log.isDebugEnabled()) {
-            log.debug("New Connection opened via ProxyClientCnx with params clientAuthRole = {}," +
-                    " clientAuthData = {}, clientAuthMethod = {}",
+            log.debug("New Connection opened via ProxyClientCnx with params clientAuthRole = {},"
+                            + " clientAuthData = {}, clientAuthMethod = {}",
                     clientAuthRole, clientAuthData, clientAuthMethod);
         }
 
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 5238c46..41ec92f 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -26,10 +26,9 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.stream.Collectors;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-
+import java.util.stream.Collectors;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
@@ -149,7 +148,7 @@ public class ProxyConfiguration implements PulsarConfiguration {
     )
     private String advertisedAddress;
 
-    @FieldContext(category=CATEGORY_SERVER,
+    @FieldContext(category = CATEGORY_SERVER,
             doc = "Enable or disable the proxy protocol.")
     private boolean haProxyProtocolEnabled;
 
@@ -300,7 +299,7 @@ public class ProxyConfiguration implements PulsarConfiguration {
     )
     private String anonymousUserRole = null;
 
-    /***** --- TLS --- ****/
+    // TLS
 
     @Deprecated
     private boolean tlsEnabledInProxy = false;
@@ -361,7 +360,7 @@ public class ProxyConfiguration implements PulsarConfiguration {
     )
     private boolean tlsRequireTrustedClientCertOnConnect = false;
 
-    /**** --- KeyStore TLS config variables --- ****/
+    // KeyStore TLS config variables
 
     @FieldContext(
             category = CATEGORY_KEYSTORE_TLS,
@@ -411,7 +410,9 @@ public class ProxyConfiguration implements PulsarConfiguration {
     )
     private String tlsTrustStorePassword = null;
 
-    /**** --- KeyStore TLS config variables used for proxy to auth with broker--- ****/
+    /**
+     * KeyStore TLS config variables used for proxy to auth with broker.
+     */
     @FieldContext(
             category = CATEGORY_KEYSTORE_TLS,
             doc = "Whether the Pulsar proxy use KeyStore type to authenticate with Pulsar brokers"
@@ -459,7 +460,7 @@ public class ProxyConfiguration implements PulsarConfiguration {
     )
     private Set<String> brokerClientTlsProtocols = new TreeSet<>();
 
-    /***** --- HTTP --- ****/
+    // HTTP
 
     @FieldContext(
         category = CATEGORY_HTTP,
@@ -478,7 +479,7 @@ public class ProxyConfiguration implements PulsarConfiguration {
             + "proxy, this should be set to the minimum value, 1, so that clients "
             + "see the data as soon as possible."
     )
-    private int httpOutputBufferSize = 32*1024;
+    private int httpOutputBufferSize = 32 * 1024;
 
     @FieldContext(
             minValue = 1,
@@ -532,15 +533,16 @@ public class ProxyConfiguration implements PulsarConfiguration {
     private Set<String> additionalServlets = new TreeSet<>();
 
     @FieldContext(
-            category =  CATEGORY_HTTP,
+            category = CATEGORY_HTTP,
             doc = "Enable the enforcement of limits on the incoming HTTP requests"
-        )
+    )
     private boolean httpRequestsLimitEnabled = false;
 
     @FieldContext(
-            category =  CATEGORY_HTTP,
-            doc = "Max HTTP requests per seconds allowed. The excess of requests will be rejected with HTTP code 429 (Too many requests)"
-        )
+            category = CATEGORY_HTTP,
+            doc = "Max HTTP requests per seconds allowed."
+                    + " The excess of requests will be rejected with HTTP code 429 (Too many requests)"
+    )
     private double httpRequestsMaxPerSecond = 100.0;
 
     @PropertiesContext(
@@ -587,7 +589,7 @@ public class ProxyConfiguration implements PulsarConfiguration {
     )
     private boolean useSeparateThreadPoolForProxyExtensions = true;
 
-    /***** --- WebSocket --- ****/
+    /***** --- WebSocket. --- ****/
     @FieldContext(
             category = CATEGORY_WEBSOCKET,
             doc = "Enable or disable the WebSocket servlet"
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 87d84f7..a6dba1c 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -19,15 +19,19 @@
 package org.apache.pulsar.proxy.server;
 
 import static com.google.common.base.Preconditions.checkArgument;
-
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
 import java.net.SocketAddress;
 import java.util.concurrent.TimeUnit;
-
 import java.util.function.Supplier;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
-
-import io.netty.handler.codec.haproxy.HAProxyMessage;
+import lombok.Getter;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationState;
@@ -40,29 +44,21 @@ import org.apache.pulsar.client.impl.PulsarChannelInitializer;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.AuthData;
-import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.protocol.PulsarHandler;
 import org.apache.pulsar.common.api.proto.CommandAuthResponse;
 import org.apache.pulsar.common.api.proto.CommandConnect;
+import org.apache.pulsar.common.api.proto.CommandGetSchema;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.CommandLookupTopic;
-import org.apache.pulsar.common.api.proto.CommandGetSchema;
 import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
 import org.apache.pulsar.common.api.proto.ServerError;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.PulsarHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
-import lombok.Getter;
-
 /**
- * Handles incoming discovery request from client and sends appropriate response back to client
+ * Handles incoming discovery request from client and sends appropriate response back to client.
  *
  */
 public class ProxyConnection extends PulsarHandler implements FutureListener<Void> {
@@ -125,10 +121,10 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
     @Override
     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
         super.channelRegistered(ctx);
-        ProxyService.activeConnections.inc();
-        if (ProxyService.activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
+        ProxyService.ACTIVE_CONNECTIONS.inc();
+        if (ProxyService.ACTIVE_CONNECTIONS.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
             ctx.close();
-            ProxyService.rejectedConnections.inc();
+            ProxyService.REJECTED_CONNECTIONS.inc();
             return;
         }
     }
@@ -136,13 +132,13 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
     @Override
     public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
         super.channelUnregistered(ctx);
-        ProxyService.activeConnections.dec();
+        ProxyService.ACTIVE_CONNECTIONS.dec();
     }
 
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         super.channelActive(ctx);
-        ProxyService.newConnections.inc();
+        ProxyService.NEW_CONNECTIONS.inc();
         service.getClientCnxs().add(this);
         LOG.info("[{}] New connection opened", remoteAddress);
     }
@@ -195,11 +191,11 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         case ProxyConnectionToBroker:
             // Pass the buffer to the outbound connection and schedule next read
             // only if we can write on the connection
-            ProxyService.opsCounter.inc();
+            ProxyService.OPS_COUNTER.inc();
             if (msg instanceof ByteBuf) {
                 int bytes = ((ByteBuf) msg).readableBytes();
                 directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
-                ProxyService.bytesCounter.inc(bytes);
+                ProxyService.BYTES_COUNTER.inc(bytes);
             }
             directProxyHandler.outboundChannel.writeAndFlush(msg).addListener(this);
             break;
@@ -400,7 +396,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
     }
 
     /**
-     * handles discovery request from client ands sends next active broker address
+     * handles discovery request from client ands sends next active broker address.
      */
     @Override
     protected void handleLookup(CommandLookupTopic lookup) {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java
index cd1b31d..4dcb095 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java
@@ -18,10 +18,10 @@
  */
 package org.apache.pulsar.proxy.server;
 
+import io.netty.channel.EventLoopGroup;
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Supplier;
-
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.ConnectionPool;
@@ -29,8 +29,6 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.channel.EventLoopGroup;
-
 public class ProxyConnectionPool extends ConnectionPool {
     public ProxyConnectionPool(ClientConfigurationData clientConfig, EventLoopGroup eventLoopGroup,
             Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 778f42b..2af7ebf 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -28,9 +28,9 @@ import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.util.concurrent.DefaultThreadFactory;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Gauge;
 import java.io.Closeable;
@@ -69,7 +69,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Pulsar proxy service
+ * Pulsar proxy service.
  */
 public class ProxyService implements Closeable {
 
@@ -108,22 +108,22 @@ public class ProxyService implements Closeable {
 
     private static final int numThreads = Runtime.getRuntime().availableProcessors();
 
-    static final Gauge activeConnections = Gauge
+    static final Gauge ACTIVE_CONNECTIONS = Gauge
             .build("pulsar_proxy_active_connections", "Number of connections currently active in the proxy").create()
             .register();
 
-    static final Counter newConnections = Counter
+    static final Counter NEW_CONNECTIONS = Counter
             .build("pulsar_proxy_new_connections", "Counter of connections being opened in the proxy").create()
             .register();
 
-    static final Counter rejectedConnections = Counter
+    static final Counter REJECTED_CONNECTIONS = Counter
             .build("pulsar_proxy_rejected_connections", "Counter for connections rejected due to throttling").create()
             .register();
 
-    static final Counter opsCounter = Counter
+    static final Counter OPS_COUNTER = Counter
             .build("pulsar_proxy_binary_ops", "Counter of proxy operations").create().register();
 
-    static final Counter bytesCounter = Counter
+    static final Counter BYTES_COUNTER = Counter
             .build("pulsar_proxy_binary_bytes", "Counter of proxy bytes").create().register();
 
     @Getter
@@ -137,11 +137,12 @@ public class ProxyService implements Closeable {
                         AuthenticationService authenticationService) throws Exception {
         requireNonNull(proxyConfig);
         this.proxyConfig = proxyConfig;
-        this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()), 1, TimeUnit.MILLISECONDS);
+        this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer",
+                Thread.currentThread().isDaemon()), 1, TimeUnit.MILLISECONDS);
         this.clientCnxs = Sets.newConcurrentHashSet();
         this.topicStats = new ConcurrentHashMap<>();
 
-        this.lookupRequestSemaphore = new AtomicReference<Semaphore>(
+        this.lookupRequestSemaphore = new AtomicReference<>(
                 new Semaphore(proxyConfig.getMaxConcurrentLookupRequests(), false));
 
         if (proxyConfig.getProxyLogLevel().isPresent()) {
@@ -208,7 +209,8 @@ public class ProxyService implements Closeable {
         // Bind and start to accept incoming connections.
         if (proxyConfig.getServicePort().isPresent()) {
             try {
-                listenChannel = bootstrap.bind(proxyConfig.getBindAddress(), proxyConfig.getServicePort().get()).sync().channel();
+                listenChannel = bootstrap.bind(proxyConfig.getBindAddress(),
+                        proxyConfig.getServicePort().get()).sync().channel();
                 LOG.info("Started Pulsar Proxy at {}", listenChannel.localAddress());
             } catch (Exception e) {
                 throw new IOException("Failed to bind Pulsar Proxy on port " + proxyConfig.getServicePort().get(), e);
@@ -218,7 +220,8 @@ public class ProxyService implements Closeable {
         if (proxyConfig.getServicePortTls().isPresent()) {
             ServerBootstrap tlsBootstrap = bootstrap.clone();
             tlsBootstrap.childHandler(new ServiceChannelInitializer(this, proxyConfig, true));
-            listenChannelTls = tlsBootstrap.bind(proxyConfig.getBindAddress(), proxyConfig.getServicePortTls().get()).sync().channel();
+            listenChannelTls = tlsBootstrap.bind(proxyConfig.getBindAddress(),
+                    proxyConfig.getServicePortTls().get()).sync().channel();
             LOG.info("Started Pulsar TLS Proxy on {}", listenChannelTls.localAddress());
         }
 
@@ -247,8 +250,8 @@ public class ProxyService implements Closeable {
     }
 
     // This call is used for starting additional protocol handlers
-    public void startProxyExtensions(
-            Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> protocolHandlers, ServerBootstrap serverBootstrap) {
+    public void startProxyExtensions(Map<String, Map<InetSocketAddress,
+            ChannelInitializer<SocketChannel>>> protocolHandlers, ServerBootstrap serverBootstrap) {
 
         protocolHandlers.forEach((extensionName, initializers) -> {
             initializers.forEach((address, initializer) -> {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 315607e..3d81317 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -25,16 +25,30 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
 import static org.slf4j.bridge.SLF4JBridgeHandler.install;
 import static org.slf4j.bridge.SLF4JBridgeHandler.removeHandlersForRootLogger;
-
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
 import com.google.common.annotations.VisibleForTesting;
+import io.netty.util.internal.PlatformDependent;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Gauge.Child;
+import io.prometheus.client.exporter.MetricsServlet;
+import io.prometheus.client.hotspot.DefaultExports;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
 import org.apache.logging.log4j.core.util.datetime.FixedDateFormat;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
-import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.configuration.VipStatus;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.util.CmdGenerateDocs;
+import org.apache.pulsar.proxy.stats.ProxyStats;
 import org.apache.pulsar.websocket.WebSocketConsumerServlet;
 import org.apache.pulsar.websocket.WebSocketPingPongServlet;
 import org.apache.pulsar.websocket.WebSocketProducerServlet;
@@ -46,27 +60,9 @@ import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-
-import io.netty.util.internal.PlatformDependent;
-import io.prometheus.client.CollectorRegistry;
-import io.prometheus.client.Gauge;
-import io.prometheus.client.Gauge.Child;
-import io.prometheus.client.exporter.MetricsServlet;
-import io.prometheus.client.hotspot.DefaultExports;
-import org.apache.pulsar.common.configuration.VipStatus;
-import org.apache.pulsar.proxy.stats.ProxyStats;
-
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-
 
 /**
- * Starts an instance of the Pulsar ProxyService
+ * Starts an instance of the Pulsar ProxyService.
  *
  */
 public class ProxyServiceStarter {
@@ -107,7 +103,8 @@ public class ProxyServiceStarter {
             DateFormat dateFormat = new SimpleDateFormat(
                 FixedDateFormat.FixedFormat.ISO8601_OFFSET_DATE_TIME_HHMM.getPattern());
             Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> {
-                System.out.println(String.format("%s [%s] error Uncaught exception in thread %s: %s", dateFormat.format(new Date()), thread.getContextClassLoader(), thread.getName(), exception.getMessage()));
+                System.out.printf("%s [%s] error Uncaught exception in thread %s: %s%n", dateFormat.format(new Date()),
+                        thread.getContextClassLoader(), thread.getName(), exception.getMessage());
                 exception.printStackTrace(System.out);
             });
 
@@ -211,10 +208,10 @@ public class ProxyServiceStarter {
 
     public void close() {
         try {
-            if(proxyService != null) {
+            if (proxyService != null) {
                 proxyService.close();
             }
-            if(server != null) {
+            if (server != null) {
                 server.stop();
             }
         } catch (Exception e) {
@@ -226,10 +223,12 @@ public class ProxyServiceStarter {
                                      ProxyConfiguration config,
                                      ProxyService service,
                                      BrokerDiscoveryProvider discoveryProvider) throws Exception {
-        server.addServlet("/metrics", new ServletHolder(MetricsServlet.class), Collections.emptyList(), config.isAuthenticateMetricsEndpoint());
+        server.addServlet("/metrics", new ServletHolder(MetricsServlet.class),
+                Collections.emptyList(), config.isAuthenticateMetricsEndpoint());
         server.addRestResources("/", VipStatus.class.getPackage().getName(),
                 VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath());
-        server.addRestResources("/proxy-stats", ProxyStats.class.getPackage().getName(), ProxyStats.ATTRIBUTE_PULSAR_PROXY_NAME, service);
+        server.addRestResources("/proxy-stats", ProxyStats.class.getPackage().getName(),
+                ProxyStats.ATTRIBUTE_PULSAR_PROXY_NAME, service);
 
         AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, discoveryProvider);
         ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index 658dd87..55f0b69 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -19,7 +19,10 @@
 package org.apache.pulsar.proxy.server;
 
 import static org.apache.commons.lang3.StringUtils.isEmpty;
-
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslHandler;
 import java.util.function.Supplier;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
@@ -28,11 +31,6 @@ import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.OptionalProxyProtocolDecoder;
 import org.apache.pulsar.common.util.NettyClientSslContextRefresher;
 import org.apache.pulsar.common.util.NettyServerSslContextBuilder;
-
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.handler.ssl.SslContext;
 import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
 import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
 
@@ -77,7 +75,8 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
             } else {
                 serverSslCtxRefresher = new NettyServerSslContextBuilder(serviceConfig.isTlsAllowInsecureConnection(),
                         serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(),
-                        serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(),
+                        serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(),
+                        serviceConfig.getTlsProtocols(),
                         serviceConfig.isTlsRequireTrustedClientCertOnConnect(),
                         serviceConfig.getTlsCertRefreshCheckDurationSec());
             }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
index c92f722..687bfaf 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
@@ -31,9 +31,9 @@ import javax.servlet.DispatcherType;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.web.AuthenticationFilter;
+import org.apache.pulsar.broker.web.JettyRequestLogFactory;
 import org.apache.pulsar.broker.web.JsonMapperProvider;
 import org.apache.pulsar.broker.web.RateLimitingFilter;
-import org.apache.pulsar.broker.web.JettyRequestLogFactory;
 import org.apache.pulsar.broker.web.WebExecutorThreadPool;
 import org.apache.pulsar.common.util.SecurityUtility;
 import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
@@ -76,7 +76,7 @@ public class WebServer {
     private ServerConnector connector;
     private ServerConnector connectorTls;
 
-    public WebServer(ProxyConfiguration config, AuthenticationService authenticationService) throws IOException {
+    public WebServer(ProxyConfiguration config, AuthenticationService authenticationService) {
         this.webServiceExecutor = new WebExecutorThreadPool(config.getHttpNumThreads(), "pulsar-external-web");
         this.server = new Server(webServiceExecutor);
         this.authenticationService = authenticationService;
@@ -84,12 +84,12 @@ public class WebServer {
 
         List<ServerConnector> connectors = new ArrayList<>();
 
-        HttpConfiguration http_config = new HttpConfiguration();
-        http_config.setOutputBufferSize(config.getHttpOutputBufferSize());
+        HttpConfiguration httpConfig = new HttpConfiguration();
+        httpConfig.setOutputBufferSize(config.getHttpOutputBufferSize());
 
         if (config.getWebServicePort().isPresent()) {
             this.externalServicePort = config.getWebServicePort().get();
-            connector = new ServerConnector(server, 1, 1, new HttpConnectionFactory(http_config));
+            connector = new ServerConnector(server, 1, 1, new HttpConnectionFactory(httpConfig));
             connector.setHost(config.getBindAddress());
             connector.setPort(externalServicePort);
             connectors.add(connector);
@@ -146,7 +146,8 @@ public class WebServer {
         addServlet(basePath, servletHolder, attributes, true);
     }
 
-    public void addServlet(String basePath, ServletHolder servletHolder, List<Pair<String, Object>> attributes, boolean requireAuthentication) {
+    public void addServlet(String basePath, ServletHolder servletHolder,
+                           List<Pair<String, Object>> attributes, boolean requireAuthentication) {
         Optional<String> existingPath = servletPaths.stream().filter(p -> p.startsWith(basePath)).findFirst();
         if (existingPath.isPresent()) {
             throw new IllegalArgumentException(
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/package-info.java
similarity index 69%
copy from pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java
copy to pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/package-info.java
index 844c7ca..07a8290 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/package-info.java
@@ -16,21 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.proxy.extensions;
-
-import lombok.Data;
-import lombok.experimental.Accessors;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * The collection of Proxy Extensions.
- */
-@Data
-@Accessors(fluent = true)
-class ExtensionsDefinitions {
-
-    private final Map<String, ProxyExtensionMetadata> extensions = new TreeMap<>();
-
-}
+package org.apache.pulsar.proxy.server;
\ No newline at end of file
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ProxyStats.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ProxyStats.java
index f709a25..d021389 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ProxyStats.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ProxyStats.java
@@ -18,28 +18,27 @@
  */
 package org.apache.pulsar.proxy.stats;
 
+import io.netty.channel.Channel;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-
 import javax.servlet.ServletContext;
 import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
 import javax.ws.rs.POST;
+import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response.Status;
-
 import org.apache.pulsar.proxy.server.ProxyService;
 
-import io.netty.channel.Channel;
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
+
 
 @Path("/")
 @Api(value = "/proxy-stats", description = "Stats for proxy", tags = "proxy-stats", hidden = true)
@@ -55,7 +54,8 @@ public class ProxyStats {
 
     @GET
     @Path("/connections")
-    @ApiOperation(value = "Proxy stats api to get info for live connections", response = List.class, responseContainer = "List")
+    @ApiOperation(value = "Proxy stats api to get info for live connections",
+            response = List.class, responseContainer = "List")
     @ApiResponses(value = { @ApiResponse(code = 503, message = "Proxy service is not initialized") })
     public List<ConnectionStats> metrics() {
         List<ConnectionStats> stats = new ArrayList<>();
@@ -88,7 +88,8 @@ public class ProxyStats {
 
     @POST
     @Path("/logging/{logLevel}")
-    @ApiOperation(hidden = true, value = "Change proxy logging level dynamically", notes = "It only changes log-level in memory, change it config file to persist the change")
+    @ApiOperation(hidden = true, value = "Change proxy logging level dynamically",
+            notes = "It only changes log-level in memory, change it config file to persist the change")
     @ApiResponses(value = { @ApiResponse(code = 412, message = "Proxy log level can be [0-2]"), })
     public void updateProxyLogLevel(@PathParam("logLevel") int logLevel) {
         if (logLevel < 0 || logLevel > 2) {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/RestException.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/RestException.java
index 9769588..7ba8d99 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/RestException.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/RestException.java
@@ -20,11 +20,9 @@ package org.apache.pulsar.proxy.stats;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
-
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-
 import org.apache.pulsar.common.policies.data.ErrorData;
 
 /**
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/TopicStats.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/TopicStats.java
index 09b04fc..1c808a8 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/TopicStats.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/TopicStats.java
@@ -18,11 +18,9 @@
  */
 package org.apache.pulsar.proxy.stats;
 
-import org.apache.pulsar.common.stats.Rate;
-
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-
 import lombok.Getter;
+import org.apache.pulsar.common.stats.Rate;
 
 @Getter
 @JsonIgnoreProperties(value = { "msgInRate", "msgOutRate" })
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/package-info.java
similarity index 69%
copy from pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java
copy to pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/package-info.java
index 844c7ca..2eb5cf8 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/package-info.java
@@ -16,21 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.proxy.extensions;
-
-import lombok.Data;
-import lombok.experimental.Accessors;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * The collection of Proxy Extensions.
- */
-@Data
-@Accessors(fluent = true)
-class ExtensionsDefinitions {
-
-    private final Map<String, ProxyExtensionMetadata> extensions = new TreeMap<>();
-
-}
+package org.apache.pulsar.proxy.stats;
\ No newline at end of file
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/util/package-info.java
similarity index 69%
copy from pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java
copy to pulsar-proxy/src/main/java/org/apache/pulsar/proxy/util/package-info.java
index 844c7ca..eddea63 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/util/package-info.java
@@ -16,21 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.proxy.extensions;
-
-import lombok.Data;
-import lombok.experimental.Accessors;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * The collection of Proxy Extensions.
- */
-@Data
-@Accessors(fluent = true)
-class ExtensionsDefinitions {
-
-    private final Map<String, ProxyExtensionMetadata> extensions = new TreeMap<>();
-
-}
+package org.apache.pulsar.proxy.util;
\ No newline at end of file
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index 062db18..098d892 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -91,7 +91,7 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest {
                 .operationTimeout(1000, TimeUnit.MILLISECONDS)
                 .build();
 
-        Assert.assertEquals(ProxyService.rejectedConnections.get(), 0.0d);
+        Assert.assertEquals(ProxyService.REJECTED_CONNECTIONS.get(), 0.0d);
         try {
             @Cleanup
             Producer<byte[]> producer2 = client2.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic-1").create();
@@ -101,7 +101,7 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest {
             // OK
         }
         // should add retry count since retry every 100ms and operation timeout is set to 1000ms
-        Assert.assertEquals(ProxyService.rejectedConnections.get(), 5.0d);
+        Assert.assertEquals(ProxyService.REJECTED_CONNECTIONS.get(), 5.0d);
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(ProxyConnectionThrottlingTest.class);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index fa3c485..25df835 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -99,7 +99,7 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest {
         } catch (Exception ex) {
             // Ignore
         }
-        Assert.assertEquals(LookupProxyHandler.rejectedPartitionsMetadataRequests.get(), 5.0d);
+        Assert.assertEquals(LookupProxyHandler.REJECTED_PARTITIONS_METADATA_REQUESTS.get(), 5.0d);
         proxyService.getLookupRequestSemaphore().release();
         try {
             @Cleanup
@@ -109,6 +109,6 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest {
             Assert.fail("Should not have failed since can acquire LookupRequestSemaphore");
         }
 
-        Assert.assertEquals(LookupProxyHandler.rejectedPartitionsMetadataRequests.get(), 5.0d);
+        Assert.assertEquals(LookupProxyHandler.REJECTED_PARTITIONS_METADATA_REQUESTS.get(), 5.0d);
     }
 }