You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/10/10 03:29:52 UTC

[pulsar] branch master updated: [cleanup][broker][Modernizer] fix violations in pulsar-broker (#17968)

This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 16cceaee6aa [cleanup][broker][Modernizer] fix violations in pulsar-broker (#17968)
16cceaee6aa is described below

commit 16cceaee6aa6aca137398abf5024e55ef5c80b7a
Author: youzipi <bl...@qq.com>
AuthorDate: Mon Oct 10 11:29:43 2022 +0800

    [cleanup][broker][Modernizer] fix violations in pulsar-broker (#17968)
    
    * [cleanup][broker][Modernizer] fix violations in pulsar-broker
---
 pulsar-broker/pom.xml                              |  3 -
 .../broker/intercept/InterceptFilterOutTest.java   |  3 +-
 .../broker/web/ProcessHandlerFilterTest.java       |  3 +-
 .../api/AuthenticatedProducerConsumerTest.java     | 10 +--
 .../AuthenticationTlsHostnameVerificationTest.java |  3 +-
 .../api/AuthorizationProducerConsumerTest.java     |  2 +-
 .../pulsar/client/api/BrokerServiceLookupTest.java | 11 +--
 .../pulsar/client/api/ConsumerRedeliveryTest.java  | 16 ++---
 .../client/api/DispatcherBlockConsumerTest.java    | 27 +++----
 .../client/api/KeySharedSubscriptionTest.java      |  6 +-
 .../client/api/MessageDispatchThrottlingTest.java  |  9 +--
 .../client/api/MutualAuthenticationTest.java       |  2 +-
 .../pulsar/client/api/NonPersistentTopicTest.java  | 15 ++--
 .../api/PartitionedProducerConsumerTest.java       | 24 +++----
 .../client/api/PatternMultiTopicsConsumerTest.java | 14 ++--
 .../apache/pulsar/client/api/RetryTopicTest.java   |  8 +--
 .../client/api/SimpleProducerConsumerStatTest.java | 30 ++++----
 .../client/api/SimpleProducerConsumerTest.java     | 82 +++++++++++-----------
 .../api/SimpleTypedProducerConsumerTest.java       | 14 ++--
 .../TokenAuthenticatedProducerConsumerTest.java    |  2 +-
 ...kenOauth2AuthenticatedProducerConsumerTest.java |  6 +-
 .../apache/pulsar/client/api/TopicReaderTest.java  | 57 +++++++--------
 .../client/api/v1/V1_ProducerConsumerTest.java     | 61 ++++++++--------
 .../client/impl/AdminApiKeyStoreTlsAuthTest.java   | 27 ++++---
 .../client/impl/BrokerClientIntegrationTest.java   |  6 +-
 .../pulsar/client/impl/ConnectionPoolTest.java     |  6 +-
 .../client/impl/KeySharedSubscriptionTest.java     | 22 +++---
 .../pulsar/client/impl/MessageChunkingTest.java    | 20 +++---
 .../pulsar/client/impl/MessageParserTest.java      | 16 ++---
 .../pulsar/client/impl/MultiTopicsReaderTest.java  |  3 +-
 .../org/apache/pulsar/client/impl/ReaderTest.java  |  3 +-
 .../pulsar/client/impl/TopicsConsumerImplTest.java |  4 +-
 .../pulsar/client/impl/ZeroQueueSizeTest.java      |  6 +-
 33 files changed, 246 insertions(+), 275 deletions(-)

diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index ca36f9ca371..a8e39ef7d24 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -436,9 +436,6 @@
         <configuration>
           <failOnViolations>true</failOnViolations>
           <javaVersion>17</javaVersion>
-          <ignorePackages>
-            <ignorePackage>org.apache.pulsar.client</ignorePackage>
-          </ignorePackages>
           <exclusionPatterns>
             <exclusionPattern>java/lang/StringBuffer."&lt;init&gt;":.*</exclusionPattern>
           </exclusionPatterns>
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
index 3d8c4732d2a..42305912baa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.intercept;
 
+import java.util.HashSet;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.web.ExceptionHandler;
@@ -151,7 +152,7 @@ public class InterceptFilterOutTest {
         Mockito.doReturn(interceptor).when(pulsarService).getBrokerInterceptor();
         ServiceConfiguration conf = Mockito.mock(ServiceConfiguration.class);
         // Disable the broker interceptor
-        Mockito.doReturn(Sets.newHashSet()).when(conf).getBrokerInterceptors();
+        Mockito.doReturn(new HashSet<>()).when(conf).getBrokerInterceptors();
         Mockito.doReturn(conf).when(pulsarService).getConfig();
         ResponseHandlerFilter filter = new ResponseHandlerFilter(pulsarService);
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/ProcessHandlerFilterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/ProcessHandlerFilterTest.java
index 6de49d9aa56..d6ade32ba6d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/ProcessHandlerFilterTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/ProcessHandlerFilterTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.web;
 
 import java.io.IOException;
+import java.util.HashSet;
 import javax.servlet.FilterChain;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
@@ -58,7 +59,7 @@ public class ProcessHandlerFilterTest {
         FilterChain spyFilterChain = Mockito.spy(FilterChain.class);
         Mockito.doReturn(spyInterceptor).when(mockPulsarService).getBrokerInterceptor();
         Mockito.doReturn(mockConfig).when(mockPulsarService).getConfig();
-        Mockito.doReturn(Sets.newHashSet()).when(mockConfig).getBrokerInterceptors();
+        Mockito.doReturn(new HashSet<>()).when(mockConfig).getBrokerInterceptors();
         // empty interceptor list
         HttpServletRequest mockHttpServletRequest = Mockito.mock(HttpServletRequest.class);
         ProcessHandlerFilter processHandlerFilter = new ProcessHandlerFilter(mockPulsarService);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
index 02791168210..e635de4acd2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
@@ -170,7 +170,7 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
         }
 
         Message<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < 10; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
@@ -215,7 +215,7 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
         admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
 
         admin.tenants().createTenant("my-property",
-                new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet("test")));
+                new TenantInfoImpl(new HashSet<>(), Sets.newHashSet("test")));
         admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
 
         testSyncProducerAndConsumer(batchMessageDelayMs);
@@ -233,7 +233,7 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
         admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
 
         admin.tenants().createTenant("my-property",
-                new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet("test")));
+                new TenantInfoImpl(new HashSet<>(), Sets.newHashSet("test")));
         admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
 
         testSyncProducerAndConsumer(batchMessageDelayMs);
@@ -344,7 +344,7 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
         String topic = "persistent://" + namespace + "1/topic1";
         // this will cause NPE and it should throw 500
         mockZooKeeperGlobal.setAlwaysFail(Code.SESSIONEXPIRED);
-        pulsar.getConfiguration().setSuperUserRoles(Sets.newHashSet());
+        pulsar.getConfiguration().setSuperUserRoles(new HashSet<>());
         try {
             admin.topics().getPartitionedTopicMetadata(topic);
         } catch (PulsarAdminException e) {
@@ -449,7 +449,7 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
         internalSetup(new AuthenticationToken(ADMIN_TOKEN));
         admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
         admin.tenants().createTenant("my-property",
-                new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet("test")));
+                new TenantInfoImpl(new HashSet<>(), Sets.newHashSet("test")));
         admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
 
         @Cleanup
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
index c46058a4919..ba061fc40ba 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.client.api;
 
-import com.google.common.collect.Sets;
 import java.lang.reflect.Method;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -206,7 +205,7 @@ public class AuthenticationTlsHostnameVerificationTest extends ProducerConsumerB
         }
 
         Message<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < 10; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index f7e6594576e..dde958d518a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -952,7 +952,7 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
 
     public static class TestAuthorizationProviderWithGrantPermission extends TestAuthorizationProvider {
 
-        private Set<String> grantRoles = Sets.newHashSet();
+        private Set<String> grantRoles = new HashSet<>();
         static AuthenticationDataSource authenticationData;
         static String authDataJson;
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index d1aa9c5bc8c..42a97bf0fcb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -46,6 +46,7 @@ import java.security.PrivateKey;
 import java.security.SecureRandom;
 import java.security.cert.Certificate;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -187,7 +188,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
         }
 
         Message<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < 10; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
@@ -314,7 +315,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
         }
 
         Message<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < 10; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
@@ -392,7 +393,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
         }
 
         Message<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < 20; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
             assertNotNull(msg, "Message should not be null");
@@ -506,11 +507,11 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
         con.connect();
         log.info("connected url: {} ", con.getURL());
         // assert connect-url: broker2-https
-        assertEquals(new Integer(con.getURL().getPort()), conf2.getWebServicePortTls().get());
+        assertEquals(Integer.valueOf(con.getURL().getPort()), conf2.getWebServicePortTls().get());
         InputStream is = con.getInputStream();
         // assert redirect-url: broker1-https only
         log.info("redirected url: {}", con.getURL());
-        assertEquals(new Integer(con.getURL().getPort()), conf.getWebServicePortTls().get());
+        assertEquals(Integer.valueOf(con.getURL().getPort()), conf.getWebServicePortTls().get());
         is.close();
 
         loadManager1 = null;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
index 95e343acfef..3d931346c89 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
@@ -18,16 +18,19 @@
  */
 package org.apache.pulsar.client.api;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-
 import lombok.Cleanup;
-
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -38,13 +41,6 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Sets;
-
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.assertEquals;
-
 @Test(groups = "broker-api")
 public class ConsumerRedeliveryTest extends ProducerConsumerBase {
 
@@ -105,7 +101,7 @@ public class ConsumerRedeliveryTest extends ProducerConsumerBase {
 
 
         int consumedCount = 0;
-        Set<MessageId> messageIds = Sets.newHashSet();
+        Set<MessageId> messageIds = new HashSet<>();
         for (int i = 0; i < totalMsgs; i++) {
             Message<byte[]> message = consumer1.receive(5, TimeUnit.SECONDS);
             if (message != null && (consumedCount % 2) == 0) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
index 97dd7760519..6c5ac0460d5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
@@ -26,12 +26,13 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Queues;
 import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -127,7 +128,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
 
             // (2) try to consume messages: but will be able to consume number of messages = unackMsgAllowed
             Message<?> msg = null;
-            Map<Message<?>, Consumer<?>> messages = Maps.newHashMap();
+            Map<Message<?>, Consumer<?>> messages = new HashMap<>();
             for (int i = 0; i < 3; i++) {
                 for (int j = 0; j < totalProducedMsgs; j++) {
                     msg = consumers.get(i).receive(500, TimeUnit.MILLISECONDS);
@@ -337,7 +338,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
 
             // (2) try to consume messages: but will be able to consume number of messages = unackMsgAllowed
             Message<?> msg = null;
-            Map<Message<?>, Consumer<?>> messages = Maps.newHashMap();
+            Map<Message<?>, Consumer<?>> messages = new HashMap<>();
             for (int i = 0; i < totalProducedMsgs; i++) {
                 msg = consumer1.receive(500, TimeUnit.MILLISECONDS);
                 if (msg != null) {
@@ -361,7 +362,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
                     .subscriptionType(SubscriptionType.Shared)
                     .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
                     .subscribe();
-            Map<Message<?>, Consumer<?>> messages2 = Maps.newHashMap();
+            Map<Message<?>, Consumer<?>> messages2 = new HashMap<>();
             // try to consume remaining messages: broker may take time to deliver so, retry multiple time to consume
             // all messages
             for (int i = 0; i < totalProducedMsgs; i++) {
@@ -422,7 +423,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
 
             // (2) try to consume messages: but will be able to consume number of messages = unackMsgAllowed
             Message<?> msg = null;
-            Set<MessageId> messages = Sets.newHashSet();
+            Set<MessageId> messages = new HashSet<>();
             for (int i = 0; i < 3; i++) {
                 for (int j = 0; j < totalProducedMsgs; j++) {
                     msg = consumers.get(i).receive(500, TimeUnit.MILLISECONDS);
@@ -448,12 +449,12 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
             Thread.sleep(1000);
 
             // now, broker must have redelivered all unacked messages
-            Map<ConsumerImpl<?>, Set<MessageId>> messages1 = Maps.newHashMap();
+            Map<ConsumerImpl<?>, Set<MessageId>> messages1 = new HashMap<>();
             for (int i = 0; i < 3; i++) {
                 for (int j = 0; j < totalProducedMsgs; j++) {
                     msg = consumers.get(i).receive(500, TimeUnit.MILLISECONDS);
                     if (msg != null) {
-                        messages1.putIfAbsent(consumers.get(i), Sets.newHashSet());
+                        messages1.putIfAbsent(consumers.get(i), new HashSet<>());
                         messages1.get(consumers.get(i)).add(msg.getMessageId());
                         log.info("Received message: " + new String(msg.getData()));
                     } else {
@@ -462,7 +463,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
                 }
             }
 
-            Set<MessageId> result = Sets.newHashSet();
+            Set<MessageId> result = new HashSet<>();
             messages1.values().forEach(result::addAll);
 
             // check all unacked messages have been redelivered
@@ -644,7 +645,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
 
         // consumer should only receive unakced messages
         Set<String> unackMsgs = unackMessages.stream().map(i -> "my-message-" + i).collect(Collectors.toSet());
-        Set<String> receivedMsgs = Sets.newHashSet();
+        Set<String> receivedMsgs = new HashSet<>();
         for (int i = 0; i < totalProducedMsgs; i++) {
             Message<?> msg = consumer.receive(500, TimeUnit.MILLISECONDS);
             if (msg == null) {
@@ -745,7 +746,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
              * maxUnAckPerBroker limit
              ***/
             Message<byte[]> msg = null;
-            Set<MessageId> messages1 = Sets.newHashSet();
+            Set<MessageId> messages1 = new HashSet<>();
             for (int j = 0; j < totalProducedMsgs; j++) {
                 msg = consumer1Sub1.receive(waitMills, TimeUnit.MILLISECONDS);
                 if (msg != null) {
@@ -793,7 +794,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
             ConsumerImpl<byte[]> consumerSub2 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
                     .subscriptionName(subscriberName2).receiverQueueSize(receiverQueueSize)
                     .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
-            Set<MessageId> messages2 = Sets.newHashSet();
+            Set<MessageId> messages2 = new HashSet<>();
             for (int j = 0; j < totalProducedMsgs; j++) {
                 msg = consumerSub2.receive(waitMills, TimeUnit.MILLISECONDS);
                 if (msg != null) {
@@ -954,7 +955,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
              * maxUnAckPerBroker limit
              ***/
             Message<?> msg = null;
-            Set<MessageId> messages1 = Sets.newHashSet();
+            Set<MessageId> messages1 = new HashSet<>();
             for (int j = 0; j < totalProducedMsgs; j++) {
                 msg = consumer1Sub1.receive(100, TimeUnit.MILLISECONDS);
                 if (msg != null) {
@@ -1006,7 +1007,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
                     .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
                     .subscriptionName(subscriberName2).receiverQueueSize(receiverQueueSize)
                     .subscriptionType(SubscriptionType.Shared).subscribe();
-            Set<MessageId> messages2 = Sets.newHashSet();
+            Set<MessageId> messages2 = new HashSet<>();
             for (int j = 0; j < totalProducedMsgs; j++) {
                 msg = consumer1Sub2.receive(100, TimeUnit.MILLISECONDS);
                 if (msg != null) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 8742b8798b4..e2198deb294 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -24,7 +24,6 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
@@ -32,6 +31,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -1320,7 +1320,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
                         .compareTo(lastMessageForKey.get(key).getValue()) > 0);
                 }
                 lastMessageForKey.put(key, message);
-                consumerKeys.putIfAbsent(check.getKey(), Sets.newHashSet());
+                consumerKeys.putIfAbsent(check.getKey(), new HashSet<>());
                 consumerKeys.get(check.getKey()).add(key);
                 received++;
             }
@@ -1353,7 +1353,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
             Assert.assertNull(noMessages, "redeliver too many messages.");
             Assert.assertEquals((check.getValue() + redeliveryCount), received);
         }
-        Set<String> allKeys = Sets.newHashSet();
+        Set<String> allKeys = new HashSet<>();
         consumerKeys.forEach((k, v) -> v.forEach(key -> {
             assertTrue(allKeys.add(key),
                 "Key "+ key +  "is distributed to multiple consumers." );
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index 23b4ea4f381..d024d43eb73 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -25,12 +25,7 @@ import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-
-import lombok.Cleanup;
-
 import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -42,7 +37,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-
+import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.cache.PendingReadsManager;
@@ -158,7 +153,7 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
         Assert.assertTrue(isDispatchRateUpdate);
         Assert.assertEquals(admin.namespaces().getDispatchRate(namespace), dispatchRate);
         Policies policies = admin.namespaces().getPolicies(namespace);
-        Map<String, DispatchRate> dispatchRateMap = Maps.newHashMap();
+        Map<String, DispatchRate> dispatchRateMap = new HashMap<>();
         dispatchRateMap.put("test", dispatchRate);
         Assert.assertEquals(policies.clusterDispatchRate, dispatchRateMap);
         Assert.assertEquals(policies.topicDispatchRate, dispatchRateMap);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
index 0dc5a1f6c44..6dc901d53e2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
@@ -227,7 +227,7 @@ public class MutualAuthenticationTest extends ProducerConsumerBase {
             producer.send(message.getBytes());
         }
         Message<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < 10; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 67bc7c0a309..677cca30c7b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -29,6 +29,7 @@ import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
 import java.net.URL;
+import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
@@ -175,7 +176,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
         producer.flush();
 
         Message<?> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < totalProduceMsg; i++) {
             msg = consumer.receive(1, TimeUnit.SECONDS);
             if (msg != null) {
@@ -218,7 +219,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
         producer.flush();
 
         Message<?> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < totalProduceMsg; i++) {
             msg = consumer.receive(1, TimeUnit.SECONDS);
             if (msg != null) {
@@ -274,7 +275,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
         producer.flush();
 
         Message<?> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < totalProduceMsg; i++) {
             msg = consumer.receive(1, TimeUnit.SECONDS);
             if (msg != null) {
@@ -316,7 +317,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
         producer.flush();
 
         Message<?> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < totalProduceMsg; i++) {
             msg = consumer.receive(1, TimeUnit.SECONDS);
             if (msg != null) {
@@ -373,7 +374,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
             latch.await();
 
             Message<?> msg = null;
-            Set<String> messageSet = Sets.newHashSet();
+            Set<String> messageSet = new HashSet<>();
             for (int i = 0; i < totalProduceMessages; i++) {
                 msg = consumer.receive(500, TimeUnit.MILLISECONDS);
                 if (msg != null) {
@@ -427,7 +428,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
 
         // consume from shared-subscriptions
         Message<?> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < totalProduceMsg; i++) {
             msg = consumer1Shared.receive(500, TimeUnit.MILLISECONDS);
             if (msg != null) {
@@ -598,7 +599,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
 
             // (1) consume by consumer1
             Message<?> msg = null;
-            Set<String> messageSet = Sets.newHashSet();
+            Set<String> messageSet = new HashSet<>();
             for (int i = 0; i < totalProducedMessages; i++) {
                 msg = consumer1.receive(300, TimeUnit.MILLISECONDS);
                 if (msg != null) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index 5f229dd580e..472f08fa962 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -22,10 +22,11 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
 import io.netty.util.Timeout;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -35,7 +36,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.PartitionedProducerImpl;
@@ -50,10 +50,6 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Sets;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
-
 @Test(groups = "broker-api")
 public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
     private static final Logger log = LoggerFactory.getLogger(PartitionedProducerConsumerTest.class);
@@ -100,7 +96,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
         }
 
         Message<byte[]> msg;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < 10; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
             Assert.assertNotNull(msg, "Message should not be null");
@@ -166,7 +162,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
             }
 
             Message<byte[]> msg;
-            Set<String> messageSet = Sets.newHashSet();
+            Set<String> messageSet = new HashSet<>();
 
             for (int i = 0; i < MESSAGE_COUNT; i++) {
                 msg = consumer.receive(5, TimeUnit.SECONDS);
@@ -211,7 +207,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
         }
 
         Message<byte[]> msg;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
 
         for (int i = 0; i < 10; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
@@ -258,7 +254,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
             producer.newMessage().key(dummyKey2).value(message.getBytes()).send();
         }
 
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < 10; i++) {
             Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
             Assert.assertNotNull(msg, "Message should not be null");
@@ -519,8 +515,8 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
         log.info("-- Starting {} test --", methodName);
 
         final int totalMsg = 100;
-        final Set<String> produceMsgs = Sets.newHashSet();
-        final Set<String> consumeMsgs = Sets.newHashSet();
+        final Set<String> produceMsgs = new HashSet<>();
+        final Set<String> consumeMsgs = new HashSet<>();
 
         int numPartitions = 4;
         TopicName topicName = TopicName
@@ -569,8 +565,8 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
         PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
 
         final int totalMsg = 100;
-        final Set<String> produceMsgs = Sets.newHashSet();
-        final Set<String> consumeMsgs = Sets.newHashSet();
+        final Set<String> produceMsgs = new HashSet<>();
+        final Set<String> consumeMsgs = new HashSet<>();
 
         int numPartitions = 4;
         TopicName topicName = TopicName
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternMultiTopicsConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternMultiTopicsConsumerTest.java
index a78b804b9b1..dc8cecdadc4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternMultiTopicsConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternMultiTopicsConsumerTest.java
@@ -18,17 +18,17 @@
  */
 package org.apache.pulsar.client.api;
 
-import com.google.common.collect.Maps;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 @Test(groups = "broker")
 public class PatternMultiTopicsConsumerTest extends ProducerConsumerBase {
@@ -71,7 +71,7 @@ public class PatternMultiTopicsConsumerTest extends ProducerConsumerBase {
     }
 
     private void testWithConsumer(Consumer<byte[]> consumer) throws Exception {
-        Map<String, List<String>> sentMessages = Maps.newHashMap();
+        Map<String, List<String>> sentMessages = new HashMap<>();
         for (int p = 0; p < 10; ++p) {
             String name = "persistent://my-property/my-ns/topic-" + p;
             Producer<byte[]> producer = pulsarClient.newProducer().topic(name).create();
@@ -83,7 +83,7 @@ public class PatternMultiTopicsConsumerTest extends ProducerConsumerBase {
         }
 
         Message<byte[]> msg;
-        Map<String, List<String>> receivedMessages = Maps.newHashMap();
+        Map<String, List<String>> receivedMessages = new HashMap<>();
         for (int i = 0; i < 100; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
index fecf545a678..5f02219aac3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
@@ -21,9 +21,9 @@ package org.apache.pulsar.client.api;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.fail;
-import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -158,14 +158,14 @@ public class RetryTopicTest extends ProducerConsumerBase {
                 .topic(topic)
                 .create();
 
-        Set<String> originMessageIds = Sets.newHashSet();
+        Set<String> originMessageIds = new HashSet<>();
         for (int i = 0; i < sendMessages; i++) {
             MessageId msgId = producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
             originMessageIds.add(msgId.toString());
         }
 
         int totalReceived = 0;
-        Set<String> retryMessageIds = Sets.newHashSet();
+        Set<String> retryMessageIds = new HashSet<>();
         do {
             Message<byte[]> message = consumer.receive();
             log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
@@ -183,7 +183,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
         assertEquals(retryMessageIds, originMessageIds);
 
         int totalInDeadLetter = 0;
-        Set<String> deadLetterMessageIds = Sets.newHashSet();
+        Set<String> deadLetterMessageIds = new HashSet<>();
         do {
             Message message = deadLetterConsumer.receive();
             log.info("dead letter consumer received message : {} {}", message.getMessageId(),
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index 40d34a7c0b7..7cfa35c9d1c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -23,7 +23,12 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -32,7 +37,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-
+import lombok.Cleanup;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.awaitility.Awaitility;
@@ -43,15 +48,6 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.RateLimiter;
-import com.google.gson.Gson;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
-
-import lombok.Cleanup;
-
 @Test(groups = "broker-api")
 public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
     private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerStatTest.class);
@@ -114,7 +110,7 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
         }
 
         Message<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < numMessages; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
@@ -153,7 +149,7 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
         }
 
         Producer<byte[]> producer = producerBuilder.create();
-        List<Future<MessageId>> futures = Lists.newArrayList();
+        List<Future<MessageId>> futures = new ArrayList<>();
 
         int numMessages = 50;
         // Asynchronously produce messages
@@ -169,7 +165,7 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
         }
 
         Message<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < numMessages; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
@@ -211,7 +207,7 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
         }
 
         Producer<byte[]> producer = producerBuilder.create();
-        List<Future<MessageId>> futures = Lists.newArrayList();
+        List<Future<MessageId>> futures = new ArrayList<>();
 
         int numMessages = 101;
         // Asynchronously produce messages
@@ -227,7 +223,7 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
         }
         Message<byte[]> msg = null;
         CompletableFuture<Message<byte[]>> future_msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < numMessages; i++) {
             future_msg = consumer.receiveAsync();
             Thread.sleep(10);
@@ -274,7 +270,7 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
         }
 
         Producer<byte[]> producer = producerBuilder.create();
-        List<Future<MessageId>> futures = Lists.newArrayList();
+        List<Future<MessageId>> futures = new ArrayList<>();
 
         // Asynchronously produce messages
         for (int i = 0; i < numMessages; i++) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 669161f67c0..08ca6e80a22 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -36,7 +36,6 @@ import static org.testng.Assert.fail;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.JsonNodeType;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -50,6 +49,7 @@ import java.time.Clock;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -336,7 +336,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         }
 
         Message<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < 10; i++) {
             msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
@@ -366,7 +366,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
             producerBuilder.batchingMaxMessages(5);
         }
         Producer<byte[]> producer = producerBuilder.create();
-        List<Future<MessageId>> futures = Lists.newArrayList();
+        List<Future<MessageId>> futures = new ArrayList<>();
 
         // Asynchronously produce messages
         for (int i = 0; i < 10; i++) {
@@ -381,7 +381,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         }
 
         Message<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < 10; i++) {
             msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
@@ -423,7 +423,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
             producerBuilder.batchingMaxMessages(5);
         }
         Producer<byte[]> producer = producerBuilder.create();
-        List<Future<MessageId>> futures = Lists.newArrayList();
+        List<Future<MessageId>> futures = new ArrayList<>();
 
         // Asynchronously produce messages
         for (int i = 0; i < numMessages; i++) {
@@ -1206,7 +1206,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         Thread.sleep(100);
 
         // 5. verify: active subscribers
-        Set<String> activeSubscriber = Sets.newHashSet();
+        Set<String> activeSubscriber = new HashSet<>();
         ledger.getActiveCursors().forEach(c -> activeSubscriber.add(c.getName()));
         assertTrue(activeSubscriber.contains(sub1));
         assertFalse(activeSubscriber.contains(sub2));
@@ -1231,8 +1231,8 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         log.info("-- Starting {} test --", methodName);
 
         final int totalMsg = 100;
-        final Set<String> produceMsgs = Sets.newHashSet();
-        final Set<String> consumeMsgs = Sets.newHashSet();
+        final Set<String> produceMsgs = new HashSet<>();
+        final Set<String> consumeMsgs = new HashSet<>();
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
                 .subscriptionName("my-subscriber-name").subscribe();
@@ -1271,8 +1271,8 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         log.info("-- Starting {} test --", methodName);
 
         final int totalMsg = 100;
-        final Set<String> produceMsgs = Sets.newHashSet();
-        final Set<String> consumeMsgs = Sets.newHashSet();
+        final Set<String> produceMsgs = new HashSet<>();
+        final Set<String> consumeMsgs = new HashSet<>();
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
                 .subscriptionName("my-subscriber-name").subscribe();
 
@@ -1399,8 +1399,8 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         }
 
         Message<byte[]> msg;
-        Set<Message<byte[]>> consumerMsgSet1 = Sets.newHashSet();
-        Set<Message<byte[]>> consumerMsgSet2 = Sets.newHashSet();
+        Set<Message<byte[]>> consumerMsgSet1 = new HashSet<>();
+        Set<Message<byte[]>> consumerMsgSet2 = new HashSet<>();
         for (int i = 0; i < 5; i++) {
             msg = consumer1.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
             consumerMsgSet1.add(msg);
@@ -1504,7 +1504,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
 
             // (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
             Message<byte[]> msg;
-            List<Message<byte[]>> messages = Lists.newArrayList();
+            List<Message<byte[]>> messages = new ArrayList<>();
             for (int i = 0; i < totalProducedMsgs; i++) {
                 msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
                 if (msg != null) {
@@ -1583,7 +1583,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
             for (int j = 0; j < totalReceiveIteration; j++) {
 
                 Message<byte[]> msg;
-                List<Message<byte[]>> messages = Lists.newArrayList();
+                List<Message<byte[]>> messages = new ArrayList<>();
                 for (int i = 0; i < totalProducedMsgs; i++) {
                     msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
                     if (msg != null) {
@@ -1701,7 +1701,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
             // (2) Consumer1: consume without ack:
             // try to consume messages: but will be able to consume number of messages = maxUnackedMessages
             Message<byte[]> msg;
-            List<Message<byte[]>> messages = Lists.newArrayList();
+            List<Message<byte[]>> messages = new ArrayList<>();
             for (int i = 0; i < totalProducedMsgs; i++) {
                 msg = consumer1.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
                 if (msg != null) {
@@ -1848,7 +1848,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
 
             // (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
             Message<byte[]> msg;
-            List<Message<byte[]>> messages = Lists.newArrayList();
+            List<Message<byte[]>> messages = new ArrayList<>();
             for (int i = 0; i < totalProducedMsgs; i++) {
                 msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
                 if (msg != null) {
@@ -1918,7 +1918,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
 
             Producer<byte[]> producer = producerBuidler.create();
 
-            List<CompletableFuture<MessageId>> futures = Lists.newArrayList();
+            List<CompletableFuture<MessageId>> futures = new ArrayList<>();
             // (1) Produced Messages
             for (int i = 0; i < totalProducedMsgs; i++) {
                 String message = "my-message-" + i;
@@ -1930,7 +1930,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
             // (2) Consumer1: consume without ack:
             // try to consume messages: but will be able to consume number of messages = maxUnackedMessages
             Message<byte[]> msg;
-            List<Message<byte[]>> messages = Lists.newArrayList();
+            List<Message<byte[]>> messages = new ArrayList<>();
             for (int i = 0; i < totalProducedMsgs; i++) {
                 msg = consumer1.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
                 if (msg != null) {
@@ -2012,7 +2012,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
             // (2) Consumer1: consume without ack:
             // try to consume messages: but will be able to consume number of messages = maxUnackedMessages
             Message<byte[]> msg;
-            List<Message<byte[]>> messages = Lists.newArrayList();
+            List<Message<byte[]>> messages = new ArrayList<>();
             for (int i = 0; i < totalProducedMsgs; i++) {
                 msg = consumer1.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
                 if (msg != null) {
@@ -2091,7 +2091,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         }
 
         Message<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < totalMsg; i++) {
             msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
@@ -2139,7 +2139,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
 
             // (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
             Message<byte[]> msg;
-            List<Message<byte[]>> messages1 = Lists.newArrayList();
+            List<Message<byte[]>> messages1 = new ArrayList<>();
             for (int i = 0; i < totalProducedMsgs; i++) {
                 msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
                 if (msg != null) {
@@ -2159,7 +2159,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
             consumer.redeliverUnacknowledgedMessages(Sets.newHashSet(redeliveryMessages));
             Thread.sleep(1000);
 
-            Set<MessageIdImpl> messages2 = Sets.newHashSet();
+            Set<MessageIdImpl> messages2 = new HashSet<>();
             for (int i = 0; i < totalProducedMsgs; i++) {
                 msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
                 if (msg != null) {
@@ -2226,7 +2226,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
 
             // (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
             Message<byte[]> msg;
-            List<Message<byte[]>> messages1 = Lists.newArrayList();
+            List<Message<byte[]>> messages1 = new ArrayList<>();
             for (int i = 0; i < totalProducedMsgs; i++) {
                 msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
                 if (msg != null) {
@@ -2245,7 +2245,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
             consumer.redeliverUnacknowledgedMessages(Sets.newHashSet(redeliveryMessages));
             Thread.sleep(1000);
 
-            Set<MessageIdImpl> messages2 = Sets.newHashSet();
+            Set<MessageIdImpl> messages2 = new HashSet<>();
             for (int i = 0; i < totalProducedMsgs; i++) {
                 msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
                 if (msg != null) {
@@ -2297,7 +2297,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
 
         Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic2")
                 .create();
-        List<Future<MessageId>> futures = Lists.newArrayList();
+        List<Future<MessageId>> futures = new ArrayList<>();
 
         // Asynchronously produce messages
         for (int i = 0; i < 15; i++) {
@@ -2375,7 +2375,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
                 .topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name")
                 .subscriptionType(SubscriptionType.Shared).receiverQueueSize(queueSize)
                 .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
-        List<Future<MessageId>> futures = Lists.newArrayList();
+        List<Future<MessageId>> futures = new ArrayList<>();
 
         // Asynchronously produce messages
         final int totalPublishMessages = 500;
@@ -2390,7 +2390,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
             future.get();
         }
 
-        List<Message<byte[]>> messages = Lists.newArrayList();
+        List<Message<byte[]>> messages = new ArrayList<>();
 
         // let consumer1 and consumer2 consume messages up to the queue will be full
         for (int i = 0; i < totalPublishMessages; i++) {
@@ -2503,7 +2503,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         producer.flush();
         // (1.a) consume first consumeMsgInParts msgs and trigger redeliver
         Message<byte[]> msg;
-        List<Message<byte[]>> messages1 = Lists.newArrayList();
+        List<Message<byte[]>> messages1 = new ArrayList<>();
         for (int i = 0; i < consumeMsgInParts; i++) {
             //There is some detailed info about this case.
             //https://github.com/apache/pulsar/pull/15088#issuecomment-1113521990
@@ -2635,7 +2635,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
 
         final int totalMsg = 10;
 
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
 
         Consumer<byte[]> cryptoConsumer = pulsarClient.newConsumer()
                 .topic(topicName).subscriptionName("my-subscriber-name")
@@ -2717,7 +2717,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
 
         final int totalMsg = 10;
 
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
                 .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()).subscribe();
         Consumer<byte[]> normalConsumer = pulsarClient.newConsumer()
@@ -2805,10 +2805,10 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         final String rsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQ [...]
         final int numMsg = 10;
 
-        Map<String, String> privateKeyFileMap = Maps.newHashMap();
+        Map<String, String> privateKeyFileMap = new HashMap<>();
         privateKeyFileMap.put("client-ecdsa.pem", ecdsaPrivateKeyFile);
         privateKeyFileMap.put("client-rsa.pem", rsaPrivateKeyFile);
-        Map<String, String> privateKeyDataMap = Maps.newHashMap();
+        Map<String, String> privateKeyDataMap = new HashMap<>();
         privateKeyDataMap.put("client-ecdsa.pem", ecdsaPrivateKeyData);
         privateKeyDataMap.put("client-rsa.pem", rsaPrivateKeyData);
 
@@ -2894,7 +2894,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
 
         final String encryptionKeyName = "client-rsa.pem";
         final String encryptionKeyVersion = "1.0";
-        Map<String, String> metadata = Maps.newHashMap();
+        Map<String, String> metadata = new HashMap<>();
         metadata.put("version", encryptionKeyVersion);
         class EncKeyReader implements CryptoKeyReader {
             final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
@@ -3073,7 +3073,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         final int totalMsg = 10;
 
         MessageImpl<byte[]> msg;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic("persistent://my-property/use/myenc-ns/myenc-topic1").subscriptionName("my-subscriber-name")
                 .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
@@ -3166,7 +3166,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
 
         final String encryptionKeyName = "client-rsa.pem";
         final String encryptionKeyVersion = "1.0";
-        Map<String, String> metadata = Maps.newHashMap();
+        Map<String, String> metadata = new HashMap<>();
         metadata.put("version", encryptionKeyVersion);
         class EncKeyReader implements CryptoKeyReader {
             final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
@@ -3505,7 +3505,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         }
 
         Message<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < 10; i++) {
             msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
@@ -3541,7 +3541,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         }
 
         Message<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < 10; i++) {
             msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
@@ -3651,7 +3651,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         Integer evenDistributionCount = noOfPartitions / 3;
         retryStrategically((test) -> {
             try {
-                Map<String, Integer> subsCount = Maps.newHashMap();
+                Map<String, Integer> subsCount = new HashMap<>();
                 admin.topics().getPartitionedStats(topicName, true).getPartitions().forEach((p, stats) -> {
                     String activeConsumerName = stats.getSubscriptions().entrySet().iterator().next()
                             .getValue().getActiveConsumerName();
@@ -3667,7 +3667,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
             return false;
         }, 5, 100);
 
-        Map<String, Integer> subsCount = Maps.newHashMap();
+        Map<String, Integer> subsCount = new HashMap<>();
         admin.topics().getPartitionedStats(topicName, true).getPartitions().forEach((p, stats) -> {
             String activeConsumerName = stats.getSubscriptions().entrySet().iterator().next().getValue().getActiveConsumerName();
             subsCount.compute(activeConsumerName, (k, v) -> v != null ? v + 1 : 1);
@@ -3810,7 +3810,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         Consumer<byte[]> consumer = consumerBuilder.subscriptionName("my-subscriber-name").subscribe();
         consumer.seek(resetPos.get());
         log.info("reset cursor to {}", resetPos.get());
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = firstMessage; i < numOfMessages; i++) {
             Message<byte[]> message = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
             String receivedMessage = new String(message.getData());
@@ -3843,7 +3843,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
                 .topic("persistent://my-property/my-ns/my-topic2");
 
         Producer<byte[]> producer = producerBuilder.create();
-        List<Future<MessageId>> futures = Lists.newArrayList();
+        List<Future<MessageId>> futures = new ArrayList<>();
 
         // Asynchronously produce messages
         byte[] message = new byte[ClientCnx.getMaxMessageSize() + 1];
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
index dc6041186bb..c5a7de1abec 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
@@ -23,10 +23,10 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import java.time.Clock;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -91,7 +91,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
         }
 
         Message<JsonEncodedPojo> msg = null;
-        Set<JsonEncodedPojo> messageSet = Sets.newHashSet();
+        Set<JsonEncodedPojo> messageSet = new HashSet<>();
         for (int i = 0; i < 10; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
             JsonEncodedPojo receivedMessage = msg.getValue();
@@ -207,7 +207,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
         }
 
         Message<org.apache.pulsar.client.api.schema.proto.Test.TestMessage> msg = null;
-        Set<org.apache.pulsar.client.api.schema.proto.Test.TestMessage> messageSet = Sets.newHashSet();
+        Set<org.apache.pulsar.client.api.schema.proto.Test.TestMessage> messageSet = new HashSet<>();
         for (int i = 0; i < 10; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
             org.apache.pulsar.client.api.schema.proto.Test.TestMessage receivedMessage = msg.getValue();
@@ -287,7 +287,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
        }
 
        Message<AvroEncodedPojo> msg = null;
-       Set<AvroEncodedPojo> messageSet = Sets.newHashSet();
+       Set<AvroEncodedPojo> messageSet = new HashSet<>();
        for (int i = 0; i < 10; i++) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            AvroEncodedPojo receivedMessage = msg.getValue();
@@ -457,7 +457,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
            .subscribe();
 
        Message<GenericRecord> msg = null;
-       Set<String> messageSet = Sets.newHashSet();
+       Set<String> messageSet = new HashSet<>();
        for (int i = 0; i < 10; i++) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            GenericRecord receivedMessage = msg.getValue();
@@ -505,7 +505,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
            .create();
 
        Message<GenericRecord> msg = null;
-       Set<String> messageSet = Sets.newHashSet();
+       Set<String> messageSet = new HashSet<>();
        for (int i = 0; i < 10; i++) {
            msg = reader.readNext();
            GenericRecord receivedMessage = msg.getValue();
@@ -578,7 +578,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
             .subscribe();
 
         Message<GenericRecord> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < 20; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
             GenericRecord receivedMessage = msg.getValue();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
index 73d89b2356c..3170a101ff8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
@@ -138,7 +138,7 @@ public class TokenAuthenticatedProducerConsumerTest extends ProducerConsumerBase
         }
 
         Message<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < 10; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
index 5d2e473d582..cc1bd8cbe59 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
@@ -132,7 +132,7 @@ public class TokenOauth2AuthenticatedProducerConsumerTest extends ProducerConsum
         }
 
         Message<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < 10; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
@@ -184,7 +184,7 @@ public class TokenOauth2AuthenticatedProducerConsumerTest extends ProducerConsum
         }
 
         Message<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < 10; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
@@ -221,7 +221,7 @@ public class TokenOauth2AuthenticatedProducerConsumerTest extends ProducerConsum
         }
 
         msg = null;
-        messageSet = Sets.newHashSet();
+        messageSet = new HashSet<>();
         for (int i = 0; i < 10; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index 72252309e65..618cd1aa1aa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -24,13 +24,12 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -40,9 +39,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
-
 import lombok.Cleanup;
-
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
@@ -129,7 +126,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
         }
 
         Message<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < 10; i++) {
             msg = reader.readNext(1, TimeUnit.SECONDS);
 
@@ -160,7 +157,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
         }
 
         Message<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < 10; i++) {
             msg = reader.readNext(1, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
@@ -185,7 +182,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
                 .startMessageId(MessageId.earliest).create();
 
         Message<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < 10; i++) {
             msg = reader.readNext(1, TimeUnit.SECONDS);
 
@@ -215,7 +212,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
                 .startMessageId(MessageId.earliest).create();
 
         Message<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < 10; i++) {
             msg = reader.readNext(1, TimeUnit.SECONDS);
 
@@ -243,7 +240,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
                 .startMessageId(MessageId.earliest).create();
 
         Message<byte[]> msg = null;
-        Set<String> messageSet1 = Sets.newHashSet();
+        Set<String> messageSet1 = new HashSet<>();
         for (int i = 0; i < 10; i++) {
             msg = reader1.readNext(1, TimeUnit.SECONDS);
 
@@ -253,7 +250,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
             testMessageOrderAndDuplicates(messageSet1, receivedMessage, expectedMessage);
         }
 
-        Set<String> messageSet2 = Sets.newHashSet();
+        Set<String> messageSet2 = new HashSet<>();
         for (int i = 0; i < 10; i++) {
             msg = reader2.readNext(1, TimeUnit.SECONDS);
 
@@ -286,14 +283,14 @@ public class TopicReaderTest extends ProducerConsumerBase {
                 .startMessageId(MessageId.earliest).create();
 
         Message<byte[]> msg = null;
-        Set<String> messageSet1 = Sets.newHashSet();
+        Set<String> messageSet1 = new HashSet<>();
         for (int i = 0; i < 10; i++) {
             msg = reader1.readNext(1, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
             assertTrue(messageSet1.add(receivedMessage));
         }
 
-        Set<String> messageSet2 = Sets.newHashSet();
+        Set<String> messageSet2 = new HashSet<>();
         for (int i = 0; i < 10; i++) {
             msg = reader2.readNext(1, TimeUnit.SECONDS);
 
@@ -377,7 +374,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
         }
 
         // Publish more messages and verify the readers only sees new messages
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = halfOfMsgs; i < numOfMessages; i++) {
             Message<byte[]> message = reader.readNext();
             String receivedMessage = new String(message.getData());
@@ -426,7 +423,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
         }
 
         // Publish more messages and verify the readers only sees new messages
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = halfOfMsgs; i < numOfMessages; i++) {
             Message<byte[]> message = reader.readNext();
             assertFalse(oldMessage.contains(message));
@@ -457,7 +454,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
 
         // Publish more messages and verify the readers only sees messages starting from the intended message
         Message<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 5; i < 10; i++) {
             msg = reader.readNext(1, TimeUnit.SECONDS);
 
@@ -557,7 +554,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
 
         final int totalMsg = 10;
 
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         Reader<byte[]> reader = pulsarClient.newReader()
                 .topic("persistent://my-property/my-ns/test-reader-myecdsa-topic1").startMessageId(MessageId.latest)
                 .cryptoKeyReader(new EncKeyReader()).create();
@@ -627,7 +624,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
 
         final int totalMsg = 10;
 
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         String topic = "persistent://my-property/my-ns/test-multi-reader-myecdsa-topic1";
         admin.topics().createPartitionedTopic(topic, 3);
         Reader<byte[]> reader = pulsarClient.newReader()
@@ -667,10 +664,10 @@ public class TopicReaderTest extends ProducerConsumerBase {
         final String rsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQ [...]
         final int numMsg = 10;
 
-        Map<String, String> privateKeyFileMap = Maps.newHashMap();
+        Map<String, String> privateKeyFileMap = new HashMap<>();
         privateKeyFileMap.put("client-ecdsa.pem", ecdsaPrivateKeyFile);
         privateKeyFileMap.put("client-rsa.pem", rsaPrivateKeyFile);
-        Map<String, String> privateKeyDataMap = Maps.newHashMap();
+        Map<String, String> privateKeyDataMap = new HashMap<>();
         privateKeyDataMap.put("client-ecdsa.pem", ecdsaPrivateKeyData);
         privateKeyDataMap.put("client-rsa.pem", rsaPrivateKeyData);
 
@@ -759,7 +756,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
         }
 
         MessageImpl<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         int index = 0;
 
         // read message till end.
@@ -815,7 +812,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
         }
 
         TopicMessageImpl<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         int index = 0;
 
         // read message till end.
@@ -1214,7 +1211,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
         assertTrue(reader.hasMessageAvailable());
 
         // Read all messages the first time
-        Set<String> messageSetA = Sets.newHashSet();
+        Set<String> messageSetA = new HashSet<>();
         for (int i = 0; i < numOfMessage; i++) {
             Message<byte[]> message = reader.readNext();
             String receivedMessage = new String(message.getData());
@@ -1228,7 +1225,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
         reader.seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m"));
 
         // Read all messages a second time after seek()
-        Set<String> messageSetB = Sets.newHashSet();
+        Set<String> messageSetB = new HashSet<>();
         for (int i = 0; i < numOfMessage; i++) {
             Message<byte[]> message = reader.readNext();
             String receivedMessage = new String(message.getData());
@@ -1262,7 +1259,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
         assertTrue(reader.hasMessageAvailable());
 
         // Read all messages the first time
-        Set<String> messageSetA = Sets.newHashSet();
+        Set<String> messageSetA = new HashSet<>();
         for (int i = 0; i < numOfMessage; i++) {
             Message<byte[]> message = reader.readNext();
             String receivedMessage = new String(message.getData());
@@ -1275,7 +1272,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
         reader.seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m"));
 
         // Read all messages a second time after seek()
-        Set<String> messageSetB = Sets.newHashSet();
+        Set<String> messageSetB = new HashSet<>();
         for (int i = 0; i < numOfMessage; i++) {
             Message<byte[]> message = reader.readNext();
             String receivedMessage = new String(message.getData());
@@ -1311,7 +1308,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
 
         // Read all messages the first time
         MessageId midmessageToSeek = null;
-        Set<String> messageSetA = Sets.newHashSet();
+        Set<String> messageSetA = new HashSet<>();
         for (int i = 0; i < numOfMessage; i++) {
             Message<byte[]> message = reader.readNext();
             String receivedMessage = new String(message.getData());
@@ -1329,7 +1326,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
         reader.seek(midmessageToSeek);
 
         // Read all halved messages after seek()
-        Set<String> messageSetB = Sets.newHashSet();
+        Set<String> messageSetB = new HashSet<>();
         for (int i = halfMessages + 1; i < numOfMessage; i++) {
             Message<byte[]> message = reader.readNext();
             String receivedMessage = new String(message.getData());
@@ -1367,7 +1364,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
         int plusTime = (halfMessages + 1) * 100;
         reader.seek(l + plusTime);
 
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = halfMessages + 1; i < numOfMessage; i++) {
             Message<byte[]> message = reader.readNext();
             String receivedMessage = new String(message.getData());
@@ -1397,7 +1394,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
         Reader<byte[]> reader = pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create();
 
         reader.seek(halfTime);
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = halfMessages + 1; i < numOfMessage; i++) {
             Message<byte[]> message = reader.readNext(10, TimeUnit.SECONDS);
             String receivedMessage = new String(message.getData());
@@ -1452,7 +1449,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
         }
 
         Reader<byte[]> reader = readerBuilder.create();
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = firstMessage; i < numOfMessages; i++) {
             Message<byte[]> message = reader.readNext();
             String receivedMessage = new String(message.getData());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index 4f9c9080646..097cf4823de 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -27,11 +27,12 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -138,7 +139,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
         }
 
         Message<String> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < 10; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
             String receivedMessage = msg.getValue();
@@ -168,7 +169,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
                 .enableBatching(batchMessageDelayMs != 0)
                 .create();
 
-        List<Future<MessageId>> futures = Lists.newArrayList();
+        List<Future<MessageId>> futures = new ArrayList<>();
 
         // Asynchronously produce messages
         for (int i = 0; i < 10; i++) {
@@ -183,7 +184,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
         }
 
         Message<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < 10; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
@@ -226,7 +227,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
                 .enableBatching(batchMessageDelayMs != 0)
                 .create();
 
-        List<Future<MessageId>> futures = Lists.newArrayList();
+        List<Future<MessageId>> futures = new ArrayList<>();
 
         // Asynchronously produce messages
         for (int i = 0; i < numMessages; i++) {
@@ -740,8 +741,8 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
         log.info("-- Starting {} test --", methodName);
 
         final int totalMsg = 100;
-        final Set<String> produceMsgs = Sets.newHashSet();
-        final Set<String> consumeMsgs = Sets.newHashSet();
+        final Set<String> produceMsgs = new HashSet<>();
+        final Set<String> consumeMsgs = new HashSet<>();
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic("persistent://my-property/use/my-ns/my-topic1")
@@ -784,8 +785,8 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
         log.info("-- Starting {} test --", methodName);
 
         final int totalMsg = 100;
-        final Set<String> produceMsgs = Sets.newHashSet();
-        final Set<String> consumeMsgs = Sets.newHashSet();
+        final Set<String> produceMsgs = new HashSet<>();
+        final Set<String> consumeMsgs = new HashSet<>();
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic("persistent://my-property/use/my-ns/my-topic1")
@@ -877,8 +878,8 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
         }
 
         Message<byte[]>msg = null;
-        Set<Message<byte[]>> consumerMsgSet1 = Sets.newHashSet();
-        Set<Message<byte[]>> consumerMsgSet2 = Sets.newHashSet();
+        Set<Message<byte[]>> consumerMsgSet1 = new HashSet<>();
+        Set<Message<byte[]>> consumerMsgSet2 = new HashSet<>();
         for (int i = 0; i < 5; i++) {
             msg = consumer1.receive();
             consumerMsgSet1.add(msg);
@@ -985,7 +986,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
 
             // (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
             Message<byte[]>msg = null;
-            List<Message<byte[]>> messages = Lists.newArrayList();
+            List<Message<byte[]>> messages = new ArrayList<>();
             for (int i = 0; i < totalProducedMsgs; i++) {
                 msg = consumer.receive(1, TimeUnit.SECONDS);
                 if (msg != null) {
@@ -1073,7 +1074,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
             for (int j = 0; j < totalReceiveIteration; j++) {
 
                 Message<byte[]>msg = null;
-                List<Message<byte[]>> messages = Lists.newArrayList();
+                List<Message<byte[]>> messages = new ArrayList<>();
                 for (int i = 0; i < totalProducedMsgs; i++) {
                     msg = consumer.receive(1, TimeUnit.SECONDS);
                     if (msg != null) {
@@ -1155,7 +1156,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
             // (2) Consumer1: consume without ack:
             // try to consume messages: but will be able to consume number of messages = maxUnackedMessages
             Message<byte[]>msg = null;
-            List<Message<byte[]>> messages = Lists.newArrayList();
+            List<Message<byte[]>> messages = new ArrayList<>();
             for (int i = 0; i < totalProducedMsgs; i++) {
                 msg = consumer1.receive(1, TimeUnit.SECONDS);
                 if (msg != null) {
@@ -1309,7 +1310,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
 
             // (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
             Message<byte[]>msg = null;
-            List<Message<byte[]>> messages = Lists.newArrayList();
+            List<Message<byte[]>> messages = new ArrayList<>();
             for (int i = 0; i < totalProducedMsgs; i++) {
                 msg = consumer.receive(1, TimeUnit.SECONDS);
                 if (msg != null) {
@@ -1375,7 +1376,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
                     .batchingMaxMessages(5)
                     .create();
 
-            List<CompletableFuture<MessageId>> futures = Lists.newArrayList();
+            List<CompletableFuture<MessageId>> futures = new ArrayList<>();
             // (1) Produced Messages
             for (int i = 0; i < totalProducedMsgs; i++) {
                 String message = "my-message-" + i;
@@ -1387,7 +1388,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
             // (2) Consumer1: consume without ack:
             // try to consume messages: but will be able to consume number of messages = maxUnackedMessages
             Message<byte[]>msg = null;
-            List<Message<byte[]>> messages = Lists.newArrayList();
+            List<Message<byte[]>> messages = new ArrayList<>();
             for (int i = 0; i < totalProducedMsgs; i++) {
                 msg = consumer1.receive(1, TimeUnit.SECONDS);
                 if (msg != null) {
@@ -1473,7 +1474,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
             // (2) Consumer1: consume without ack:
             // try to consume messages: but will be able to consume number of messages = maxUnackedMessages
             Message<byte[]>msg = null;
-            List<Message<byte[]>> messages = Lists.newArrayList();
+            List<Message<byte[]>> messages = new ArrayList<>();
             for (int i = 0; i < totalProducedMsgs; i++) {
                 msg = consumer1.receive(1, TimeUnit.SECONDS);
                 if (msg != null) {
@@ -1556,7 +1557,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
         }
 
         Message<byte[]>msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < totalMsg; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
@@ -1609,7 +1610,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
 
             // (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
             Message<byte[]>msg = null;
-            List<Message<byte[]>> messages1 = Lists.newArrayList();
+            List<Message<byte[]>> messages1 = new ArrayList<>();
             for (int i = 0; i < totalProducedMsgs; i++) {
                 msg = consumer.receive(1, TimeUnit.SECONDS);
                 if (msg != null) {
@@ -1630,7 +1631,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
             consumer.redeliverUnacknowledgedMessages(Sets.newHashSet(redeliveryMessages));
             Thread.sleep(1000);
 
-            Set<MessageIdImpl> messages2 = Sets.newHashSet();
+            Set<MessageIdImpl> messages2 = new HashSet<>();
             for (int i = 0; i < totalProducedMsgs; i++) {
                 msg = consumer.receive(1, TimeUnit.SECONDS);
                 if (msg != null) {
@@ -1705,7 +1706,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
 
             // (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
             Message<byte[]>msg = null;
-            List<Message<byte[]>> messages1 = Lists.newArrayList();
+            List<Message<byte[]>> messages1 = new ArrayList<>();
             for (int i = 0; i < totalProducedMsgs; i++) {
                 msg = consumer.receive(1, TimeUnit.SECONDS);
                 if (msg != null) {
@@ -1726,7 +1727,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
             consumer.redeliverUnacknowledgedMessages(Sets.newHashSet(redeliveryMessages));
             Thread.sleep(1000);
 
-            Set<MessageIdImpl> messages2 = Sets.newHashSet();
+            Set<MessageIdImpl> messages2 = new HashSet<>();
             for (int i = 0; i < totalProducedMsgs; i++) {
                 msg = consumer.receive(1, TimeUnit.SECONDS);
                 if (msg != null) {
@@ -1782,7 +1783,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
         Producer<byte[]> producer = pulsarClient.newProducer()
                 .topic("persistent://my-property/use/my-ns/my-topic2")
                 .create();
-        List<Future<MessageId>> futures = Lists.newArrayList();
+        List<Future<MessageId>> futures = new ArrayList<>();
 
         // Asynchronously produce messages
         for (int i = 0; i < 15; i++) {
@@ -1857,7 +1858,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
                 .topic("persistent://my-property/use/my-ns/my-topic2")
                 .enableBatching(false)
                 .create();
-        List<Future<MessageId>> futures = Lists.newArrayList();
+        List<Future<MessageId>> futures = new ArrayList<>();
 
         // Asynchronously produce messages
         final int totalPublishMessages = 500;
@@ -1872,7 +1873,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
             future.get();
         }
 
-        List<Message<byte[]>> messages = Lists.newArrayList();
+        List<Message<byte[]>> messages = new ArrayList<>();
 
         // let consumer1 and consumer2 cosume messages up to the queue will be full
         for (int i = 0; i < totalPublishMessages; i++) {
@@ -1985,7 +1986,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
         }
         // (1.a) consume first consumeMsgInParts msgs and trigger redeliver
         Message<byte[]> msg = null;
-        List<Message<byte[]>> messages1 = Lists.newArrayList();
+        List<Message<byte[]>> messages1 = new ArrayList<>();
         for (int i = 0; i < consumeMsgInParts; i++) {
             msg = consumer.receive(1, TimeUnit.SECONDS);
             if (msg != null) {
@@ -2121,7 +2122,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
 
         final int totalMsg = 10;
 
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic("persistent://my-property/use/my-ns/myecdsa-topic1")
                 .subscriptionName("my-subscriber-name")
@@ -2197,7 +2198,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
 
         final int totalMsg = 10;
 
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic("persistent://my-property/use/my-ns/myrsa-topic1")
                 .subscriptionName("my-subscriber-name")
@@ -2279,7 +2280,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
         final int totalMsg = 10;
 
         Message<String> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                 .topic("persistent://my-property/use/my-ns/myenc-topic1")
                 .subscriptionName("my-subscriber-name")
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AdminApiKeyStoreTlsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AdminApiKeyStoreTlsAuthTest.java
index 22400e0ae64..04b05b36d0e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AdminApiKeyStoreTlsAuthTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AdminApiKeyStoreTlsAuthTest.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.client.impl;
 
 import static org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls.mapToString;
 import static org.testng.AssertJUnit.fail;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import io.jsonwebtoken.SignatureAlgorithm;
 import java.util.HashMap;
@@ -172,9 +171,9 @@ public class AdminApiKeyStoreTlsAuthTest extends ProducerConsumerBase {
         try (PulsarAdmin admin = buildAdminClient()) {
             admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
             admin.tenants().createTenant("tenant1",
-                                         new TenantInfoImpl(ImmutableSet.of("foobar"),
-                                                        ImmutableSet.of("test")));
-            Assert.assertEquals(ImmutableSet.of("tenant1"), admin.tenants().getTenants());
+                                         new TenantInfoImpl(Set.of("foobar"),
+                                                        Set.of("test")));
+            Assert.assertEquals(Set.of("tenant1"), admin.tenants().getTenants());
         }
     }
 
@@ -183,8 +182,8 @@ public class AdminApiKeyStoreTlsAuthTest extends ProducerConsumerBase {
         try (PulsarAdmin admin = buildAdminClient()) {
             admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
             admin.tenants().createTenant("tenant1",
-                                         new TenantInfoImpl(ImmutableSet.of("proxy"),
-                                                        ImmutableSet.of("test")));
+                                         new TenantInfoImpl(Set.of("proxy"),
+                                                        Set.of("test")));
             admin.namespaces().createNamespace("tenant1/ns1");
             Assert.assertTrue(admin.namespaces().getNamespaces("tenant1").contains("tenant1/ns1"));
         }
@@ -195,12 +194,12 @@ public class AdminApiKeyStoreTlsAuthTest extends ProducerConsumerBase {
         try (PulsarAdmin admin = buildAdminClient()) {
             admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
             admin.tenants().createTenant("tenant1",
-                                         new TenantInfoImpl(ImmutableSet.of("proxy", "user1"),
-                                                        ImmutableSet.of("test")));
+                                         new TenantInfoImpl(Set.of("proxy", "user1"),
+                                                        Set.of("test")));
             admin.namespaces().createNamespace("tenant1/ns1");
         }
         WebTarget root = buildWebClient();
-        Assert.assertEquals(ImmutableSet.of("tenant1/ns1"),
+        Assert.assertEquals(Set.of("tenant1/ns1"),
                             root.path("/admin/v2/namespaces").path("tenant1")
                             .request(MediaType.APPLICATION_JSON)
                             .header("X-Original-Principal", "user1")
@@ -214,9 +213,9 @@ public class AdminApiKeyStoreTlsAuthTest extends ProducerConsumerBase {
         try (PulsarAdmin admin = buildAdminClient()) {
             admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
             admin.tenants().createTenant("tenant1",
-                    new TenantInfoImpl(ImmutableSet.of("foobar"),
-                            ImmutableSet.of("test")));
-            Assert.assertEquals(ImmutableSet.of("tenant1"), admin.tenants().getTenants());
+                    new TenantInfoImpl(Set.of("foobar"),
+                            Set.of("test")));
+            Assert.assertEquals(Set.of("tenant1"), admin.tenants().getTenants());
 
             admin.namespaces().createNamespace("tenant1/ns1");
 
@@ -259,7 +258,7 @@ public class AdminApiKeyStoreTlsAuthTest extends ProducerConsumerBase {
 
         admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
         admin.tenants().createTenant("tenant1",
-                new TenantInfoImpl(ImmutableSet.of("foobar"),
-                        ImmutableSet.of("test")));
+                new TenantInfoImpl(Set.of("foobar"),
+                        Set.of("test")));
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index a8c3d353e84..e3af761429a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -37,7 +37,6 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import java.io.InputStream;
@@ -46,6 +45,7 @@ import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.Optional;
@@ -367,7 +367,7 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
             String message = "my-message-" + i;
             lastNonBatchedMessageId = producer.send(message.getBytes());
         }
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         Message<byte[]> msg = null;
         for (int i = 0; i < numMessagesPerBatch; i++) {
             msg = consumer1.receive(1, TimeUnit.SECONDS);
@@ -563,7 +563,7 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
                 ClientCnx cnx = producer.cnx();
                 assertTrue(cnx.channel().isActive());
 
-                final List<CompletableFuture<Producer<byte[]>>> futures = Lists.newArrayList();
+                final List<CompletableFuture<Producer<byte[]>>> futures = new ArrayList<>();
                 final int totalProducers = 10;
                 CountDownLatch latch = new CountDownLatch(totalProducers);
                 for (int i = 0; i < totalProducers; i++) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
index 9fbea7f2914..3d8b18c52e6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
@@ -19,10 +19,10 @@
 package org.apache.pulsar.client.impl;
 
 import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
-import com.google.common.collect.Lists;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.IntStream;
@@ -63,7 +63,7 @@ public class ConnectionPoolTest extends MockedPulsarServiceBaseTest {
         conf.setServiceUrl(serviceUrl);
         PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool);
 
-        List<InetSocketAddress> result = Lists.newArrayList();
+        List<InetSocketAddress> result = new ArrayList<>();
         result.add(new InetSocketAddress("127.0.0.1", brokerPort));
         Mockito.when(pool.resolveName(InetSocketAddress.createUnresolved("non-existing-dns-name",
                 brokerPort)))
@@ -83,7 +83,7 @@ public class ConnectionPoolTest extends MockedPulsarServiceBaseTest {
         conf.setServiceUrl(serviceUrl);
         PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool);
 
-        List<InetSocketAddress> result = Lists.newArrayList();
+        List<InetSocketAddress> result = new ArrayList<>();
 
         // Add a non existent IP to the response to check that we're trying the 2nd address as well
         result.add(new InetSocketAddress("127.0.0.99", brokerPort));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
index 36d3bf6c440..d284853050e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
@@ -18,8 +18,17 @@
  */
 package org.apache.pulsar.client.impl;
 
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import lombok.Cleanup;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.pulsar.client.api.Consumer;
@@ -35,15 +44,6 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 @Test(groups = "broker-impl")
 public class KeySharedSubscriptionTest extends ProducerConsumerBase {
@@ -75,7 +75,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
                 .build();
         final int totalMsg = 1000;
         String topic = "broker-close-test-" + RandomStringUtils.randomAlphabetic(5);
-        Map<Consumer<?>, List<MessageId>> nameToId = Maps.newConcurrentMap();
+        Map<Consumer<?>, List<MessageId>> nameToId = new ConcurrentHashMap<>();
         Set<MessageId> pubMessages = Sets.newConcurrentHashSet();
         Set<MessageId> recMessages = Sets.newConcurrentHashSet();
         AtomicLong lastActiveTime = new AtomicLong();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index 783e971f391..2a7b8fe8d98 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -23,12 +23,12 @@ import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
@@ -137,7 +137,7 @@ public class MessageChunkingTest extends ProducerConsumerBase {
 
         PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
 
-        List<String> publishedMessages = Lists.newArrayList();
+        List<String> publishedMessages = new ArrayList<>();
         for (int i = 0; i < totalMessages; i++) {
             String message = createMessagePayload(i * 100);
             publishedMessages.add(message);
@@ -145,8 +145,8 @@ public class MessageChunkingTest extends ProducerConsumerBase {
         }
 
         Message<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
-        List<MessageId> msgIds = Lists.newArrayList();
+        Set<String> messageSet = new HashSet<>();
+        List<MessageId> msgIds = new ArrayList<>();
         for (int i = 0; i < totalMessages; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
@@ -227,7 +227,7 @@ public class MessageChunkingTest extends ProducerConsumerBase {
 
         PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
 
-        List<String> publishedMessages = Lists.newArrayList();
+        List<String> publishedMessages = new ArrayList<>();
         for (int i = 0; i < totalMessages; i++) {
             String message = createMessagePayload(i * 100);
             publishedMessages.add(message);
@@ -235,7 +235,7 @@ public class MessageChunkingTest extends ProducerConsumerBase {
         }
 
         Message<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < totalMessages; i++) {
             msg = reader.readNext(5, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
@@ -328,7 +328,7 @@ public class MessageChunkingTest extends ProducerConsumerBase {
 
         Producer<byte[]>[] producers = new Producer[totalProducers];
         int totalPublishedMessages = totalProducers;
-        List<CompletableFuture<MessageId>> futures = Lists.newArrayList();
+        List<CompletableFuture<MessageId>> futures = new ArrayList<>();
         for (int i = 0; i < totalProducers; i++) {
             producers[i] = producerBuilder.enableChunking(true).enableBatching(false).create();
             int index = i;
@@ -341,7 +341,7 @@ public class MessageChunkingTest extends ProducerConsumerBase {
         PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
 
         Message<byte[]> msg = null;
-        Set<String> messageSet = Sets.newHashSet();
+        Set<String> messageSet = new HashSet<>();
         for (int i = 0; i < totalMessages; i++) {
             msg = consumer.receive(1, TimeUnit.SECONDS);
             if (msg == null) {
@@ -501,7 +501,7 @@ public class MessageChunkingTest extends ProducerConsumerBase {
         }
 
         Message<byte[]> msg = null;
-        List<MessageId> msgIds = Lists.newArrayList();
+        List<MessageId> msgIds = new ArrayList<>();
         for (int i = 0; i < totalMessages; i++) {
             msg = consumer1.receive(5, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
index 95cfdd9f10c..92c2d767098 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
@@ -19,16 +19,12 @@
 package org.apache.pulsar.client.impl;
 
 import static org.testng.Assert.assertEquals;
-
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
-
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -38,12 +34,12 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.api.raw.MessageParser;
 import org.apache.pulsar.common.api.raw.RawMessage;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.protocol.Commands;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -116,7 +112,7 @@ public class MessageParserTest extends MockedPulsarServiceBaseTest {
         if (batchEnabled) {
             Entry entry = cursor.readEntriesOrWait(1).get(0);
 
-            List<RawMessage> messages = Lists.newArrayList();
+            List<RawMessage> messages = new ArrayList<>();
             ByteBuf headsAndPayload = entry.getDataBuffer();
 
             try {
@@ -147,8 +143,8 @@ public class MessageParserTest extends MockedPulsarServiceBaseTest {
             List<Entry> entries = cursor.readEntriesOrWait(n);
             assertEquals(entries.size(), n);
 
-            List<ByteBuf> headsAndPayloadList = Lists.newArrayList();
-            List<RawMessage> messages = Lists.newArrayList();
+            List<ByteBuf> headsAndPayloadList = new ArrayList<>();
+            List<RawMessage> messages = new ArrayList<>();
             for (Entry entry : entries) {
                 ByteBuf headsAndPayload = entry.getDataBuffer();
                 headsAndPayloadList.add(headsAndPayload);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
index edb5f0cd88d..2974c39744f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
@@ -23,7 +23,6 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -259,7 +258,7 @@ public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest {
         Reader<byte[]> reader = pulsarClient.newReader().topic(topic)
                 .startMessageFromRollbackDuration(2, TimeUnit.HOURS).create();
 
-        List<MessageId> receivedMessageIds = Lists.newArrayList();
+        List<MessageId> receivedMessageIds = new ArrayList<>();
 
         while (reader.hasMessageAvailable()) {
             Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 53ae8f97008..9c6407ac712 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -23,7 +23,6 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -300,7 +299,7 @@ public class ReaderTest extends MockedPulsarServiceBaseTest {
         Reader<byte[]> reader = pulsarClient.newReader().topic(topic)
                 .startMessageFromRollbackDuration(2, TimeUnit.HOURS).create();
 
-        List<MessageId> receivedMessageIds = Lists.newArrayList();
+        List<MessageId> receivedMessageIds = new ArrayList<>();
 
         while (reader.hasMessageAvailable()) {
             Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 8d068d65114..d3702d04e06 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -271,7 +271,7 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
         assertTrue(consumer instanceof MultiTopicsConsumerImpl);
 
         // Asynchronously produce messages
-        List<Future<MessageId>> futures = Lists.newArrayList();
+        List<Future<MessageId>> futures = new ArrayList<>();
         for (int i = 0; i < totalMessages / 3; i++) {
             futures.add(producer1.sendAsync((messagePredicate + "producer1-" + i).getBytes()));
             futures.add(producer2.sendAsync((messagePredicate + "producer2-" + i).getBytes()));
@@ -697,7 +697,7 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
 
         try {
             pulsarClient.newConsumer()
-                .topics(Lists.newArrayList())
+                .topics(new ArrayList<>())
                 .subscriptionName(subscriptionName)
                 .subscriptionType(SubscriptionType.Shared)
                 .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
index 8a66fa75353..73ebd56e2af 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.client.impl;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -34,9 +33,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-
-import com.google.common.collect.Lists;
-
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
@@ -156,7 +152,7 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
             .create();
 
         // 3. Create Consumer
-        List<Message<byte[]>> messages = Lists.newArrayList();
+        List<Message<byte[]>> messages = new ArrayList<>();
         CountDownLatch latch = new CountDownLatch(totalMessages);
         ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
                 .subscriptionName(subscriptionName).receiverQueueSize(0).messageListener((cons, msg) -> {