You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/10/10 03:29:52 UTC
[pulsar] branch master updated: [cleanup][broker][Modernizer] fix violations in pulsar-broker (#17968)
This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 16cceaee6aa [cleanup][broker][Modernizer] fix violations in pulsar-broker (#17968)
16cceaee6aa is described below
commit 16cceaee6aa6aca137398abf5024e55ef5c80b7a
Author: youzipi <bl...@qq.com>
AuthorDate: Mon Oct 10 11:29:43 2022 +0800
[cleanup][broker][Modernizer] fix violations in pulsar-broker (#17968)
* [cleanup][broker][Modernizer] fix violations in pulsar-broker
---
pulsar-broker/pom.xml | 3 -
.../broker/intercept/InterceptFilterOutTest.java | 3 +-
.../broker/web/ProcessHandlerFilterTest.java | 3 +-
.../api/AuthenticatedProducerConsumerTest.java | 10 +--
.../AuthenticationTlsHostnameVerificationTest.java | 3 +-
.../api/AuthorizationProducerConsumerTest.java | 2 +-
.../pulsar/client/api/BrokerServiceLookupTest.java | 11 +--
.../pulsar/client/api/ConsumerRedeliveryTest.java | 16 ++---
.../client/api/DispatcherBlockConsumerTest.java | 27 +++----
.../client/api/KeySharedSubscriptionTest.java | 6 +-
.../client/api/MessageDispatchThrottlingTest.java | 9 +--
.../client/api/MutualAuthenticationTest.java | 2 +-
.../pulsar/client/api/NonPersistentTopicTest.java | 15 ++--
.../api/PartitionedProducerConsumerTest.java | 24 +++----
.../client/api/PatternMultiTopicsConsumerTest.java | 14 ++--
.../apache/pulsar/client/api/RetryTopicTest.java | 8 +--
.../client/api/SimpleProducerConsumerStatTest.java | 30 ++++----
.../client/api/SimpleProducerConsumerTest.java | 82 +++++++++++-----------
.../api/SimpleTypedProducerConsumerTest.java | 14 ++--
.../TokenAuthenticatedProducerConsumerTest.java | 2 +-
...kenOauth2AuthenticatedProducerConsumerTest.java | 6 +-
.../apache/pulsar/client/api/TopicReaderTest.java | 57 +++++++--------
.../client/api/v1/V1_ProducerConsumerTest.java | 61 ++++++++--------
.../client/impl/AdminApiKeyStoreTlsAuthTest.java | 27 ++++---
.../client/impl/BrokerClientIntegrationTest.java | 6 +-
.../pulsar/client/impl/ConnectionPoolTest.java | 6 +-
.../client/impl/KeySharedSubscriptionTest.java | 22 +++---
.../pulsar/client/impl/MessageChunkingTest.java | 20 +++---
.../pulsar/client/impl/MessageParserTest.java | 16 ++---
.../pulsar/client/impl/MultiTopicsReaderTest.java | 3 +-
.../org/apache/pulsar/client/impl/ReaderTest.java | 3 +-
.../pulsar/client/impl/TopicsConsumerImplTest.java | 4 +-
.../pulsar/client/impl/ZeroQueueSizeTest.java | 6 +-
33 files changed, 246 insertions(+), 275 deletions(-)
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index ca36f9ca371..a8e39ef7d24 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -436,9 +436,6 @@
<configuration>
<failOnViolations>true</failOnViolations>
<javaVersion>17</javaVersion>
- <ignorePackages>
- <ignorePackage>org.apache.pulsar.client</ignorePackage>
- </ignorePackages>
<exclusionPatterns>
<exclusionPattern>java/lang/StringBuffer."<init>":.*</exclusionPattern>
</exclusionPatterns>
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
index 3d8c4732d2a..42305912baa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.intercept;
+import java.util.HashSet;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.web.ExceptionHandler;
@@ -151,7 +152,7 @@ public class InterceptFilterOutTest {
Mockito.doReturn(interceptor).when(pulsarService).getBrokerInterceptor();
ServiceConfiguration conf = Mockito.mock(ServiceConfiguration.class);
// Disable the broker interceptor
- Mockito.doReturn(Sets.newHashSet()).when(conf).getBrokerInterceptors();
+ Mockito.doReturn(new HashSet<>()).when(conf).getBrokerInterceptors();
Mockito.doReturn(conf).when(pulsarService).getConfig();
ResponseHandlerFilter filter = new ResponseHandlerFilter(pulsarService);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/ProcessHandlerFilterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/ProcessHandlerFilterTest.java
index 6de49d9aa56..d6ade32ba6d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/ProcessHandlerFilterTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/ProcessHandlerFilterTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.web;
import java.io.IOException;
+import java.util.HashSet;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
@@ -58,7 +59,7 @@ public class ProcessHandlerFilterTest {
FilterChain spyFilterChain = Mockito.spy(FilterChain.class);
Mockito.doReturn(spyInterceptor).when(mockPulsarService).getBrokerInterceptor();
Mockito.doReturn(mockConfig).when(mockPulsarService).getConfig();
- Mockito.doReturn(Sets.newHashSet()).when(mockConfig).getBrokerInterceptors();
+ Mockito.doReturn(new HashSet<>()).when(mockConfig).getBrokerInterceptors();
// empty interceptor list
HttpServletRequest mockHttpServletRequest = Mockito.mock(HttpServletRequest.class);
ProcessHandlerFilter processHandlerFilter = new ProcessHandlerFilter(mockPulsarService);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
index 02791168210..e635de4acd2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
@@ -170,7 +170,7 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
}
Message<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
@@ -215,7 +215,7 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
admin.tenants().createTenant("my-property",
- new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet("test")));
+ new TenantInfoImpl(new HashSet<>(), Sets.newHashSet("test")));
admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
testSyncProducerAndConsumer(batchMessageDelayMs);
@@ -233,7 +233,7 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
admin.tenants().createTenant("my-property",
- new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet("test")));
+ new TenantInfoImpl(new HashSet<>(), Sets.newHashSet("test")));
admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
testSyncProducerAndConsumer(batchMessageDelayMs);
@@ -344,7 +344,7 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
String topic = "persistent://" + namespace + "1/topic1";
// this will cause NPE and it should throw 500
mockZooKeeperGlobal.setAlwaysFail(Code.SESSIONEXPIRED);
- pulsar.getConfiguration().setSuperUserRoles(Sets.newHashSet());
+ pulsar.getConfiguration().setSuperUserRoles(new HashSet<>());
try {
admin.topics().getPartitionedTopicMetadata(topic);
} catch (PulsarAdminException e) {
@@ -449,7 +449,7 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
internalSetup(new AuthenticationToken(ADMIN_TOKEN));
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
admin.tenants().createTenant("my-property",
- new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet("test")));
+ new TenantInfoImpl(new HashSet<>(), Sets.newHashSet("test")));
admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
@Cleanup
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
index c46058a4919..ba061fc40ba 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.client.api;
-import com.google.common.collect.Sets;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.HashSet;
@@ -206,7 +205,7 @@ public class AuthenticationTlsHostnameVerificationTest extends ProducerConsumerB
}
Message<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index f7e6594576e..dde958d518a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -952,7 +952,7 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
public static class TestAuthorizationProviderWithGrantPermission extends TestAuthorizationProvider {
- private Set<String> grantRoles = Sets.newHashSet();
+ private Set<String> grantRoles = new HashSet<>();
static AuthenticationDataSource authenticationData;
static String authDataJson;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index d1aa9c5bc8c..42a97bf0fcb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -46,6 +46,7 @@ import java.security.PrivateKey;
import java.security.SecureRandom;
import java.security.cert.Certificate;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -187,7 +188,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
}
Message<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
@@ -314,7 +315,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
}
Message<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
@@ -392,7 +393,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
}
Message<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < 20; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
assertNotNull(msg, "Message should not be null");
@@ -506,11 +507,11 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
con.connect();
log.info("connected url: {} ", con.getURL());
// assert connect-url: broker2-https
- assertEquals(new Integer(con.getURL().getPort()), conf2.getWebServicePortTls().get());
+ assertEquals(Integer.valueOf(con.getURL().getPort()), conf2.getWebServicePortTls().get());
InputStream is = con.getInputStream();
// assert redirect-url: broker1-https only
log.info("redirected url: {}", con.getURL());
- assertEquals(new Integer(con.getURL().getPort()), conf.getWebServicePortTls().get());
+ assertEquals(Integer.valueOf(con.getURL().getPort()), conf.getWebServicePortTls().get());
is.close();
loadManager1 = null;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
index 95e343acfef..3d931346c89 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
@@ -18,16 +18,19 @@
*/
package org.apache.pulsar.client.api;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-
import lombok.Cleanup;
-
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.util.FutureUtil;
@@ -38,13 +41,6 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import com.google.common.collect.Sets;
-
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.assertEquals;
-
@Test(groups = "broker-api")
public class ConsumerRedeliveryTest extends ProducerConsumerBase {
@@ -105,7 +101,7 @@ public class ConsumerRedeliveryTest extends ProducerConsumerBase {
int consumedCount = 0;
- Set<MessageId> messageIds = Sets.newHashSet();
+ Set<MessageId> messageIds = new HashSet<>();
for (int i = 0; i < totalMsgs; i++) {
Message<byte[]> message = consumer1.receive(5, TimeUnit.SECONDS);
if (message != null && (consumedCount % 2) == 0) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
index 97dd7760519..6c5ac0460d5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
@@ -26,12 +26,13 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -127,7 +128,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
// (2) try to consume messages: but will be able to consume number of messages = unackMsgAllowed
Message<?> msg = null;
- Map<Message<?>, Consumer<?>> messages = Maps.newHashMap();
+ Map<Message<?>, Consumer<?>> messages = new HashMap<>();
for (int i = 0; i < 3; i++) {
for (int j = 0; j < totalProducedMsgs; j++) {
msg = consumers.get(i).receive(500, TimeUnit.MILLISECONDS);
@@ -337,7 +338,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
// (2) try to consume messages: but will be able to consume number of messages = unackMsgAllowed
Message<?> msg = null;
- Map<Message<?>, Consumer<?>> messages = Maps.newHashMap();
+ Map<Message<?>, Consumer<?>> messages = new HashMap<>();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer1.receive(500, TimeUnit.MILLISECONDS);
if (msg != null) {
@@ -361,7 +362,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
.subscriptionType(SubscriptionType.Shared)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscribe();
- Map<Message<?>, Consumer<?>> messages2 = Maps.newHashMap();
+ Map<Message<?>, Consumer<?>> messages2 = new HashMap<>();
// try to consume remaining messages: broker may take time to deliver so, retry multiple time to consume
// all messages
for (int i = 0; i < totalProducedMsgs; i++) {
@@ -422,7 +423,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
// (2) try to consume messages: but will be able to consume number of messages = unackMsgAllowed
Message<?> msg = null;
- Set<MessageId> messages = Sets.newHashSet();
+ Set<MessageId> messages = new HashSet<>();
for (int i = 0; i < 3; i++) {
for (int j = 0; j < totalProducedMsgs; j++) {
msg = consumers.get(i).receive(500, TimeUnit.MILLISECONDS);
@@ -448,12 +449,12 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
Thread.sleep(1000);
// now, broker must have redelivered all unacked messages
- Map<ConsumerImpl<?>, Set<MessageId>> messages1 = Maps.newHashMap();
+ Map<ConsumerImpl<?>, Set<MessageId>> messages1 = new HashMap<>();
for (int i = 0; i < 3; i++) {
for (int j = 0; j < totalProducedMsgs; j++) {
msg = consumers.get(i).receive(500, TimeUnit.MILLISECONDS);
if (msg != null) {
- messages1.putIfAbsent(consumers.get(i), Sets.newHashSet());
+ messages1.putIfAbsent(consumers.get(i), new HashSet<>());
messages1.get(consumers.get(i)).add(msg.getMessageId());
log.info("Received message: " + new String(msg.getData()));
} else {
@@ -462,7 +463,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
}
}
- Set<MessageId> result = Sets.newHashSet();
+ Set<MessageId> result = new HashSet<>();
messages1.values().forEach(result::addAll);
// check all unacked messages have been redelivered
@@ -644,7 +645,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
// consumer should only receive unakced messages
Set<String> unackMsgs = unackMessages.stream().map(i -> "my-message-" + i).collect(Collectors.toSet());
- Set<String> receivedMsgs = Sets.newHashSet();
+ Set<String> receivedMsgs = new HashSet<>();
for (int i = 0; i < totalProducedMsgs; i++) {
Message<?> msg = consumer.receive(500, TimeUnit.MILLISECONDS);
if (msg == null) {
@@ -745,7 +746,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
* maxUnAckPerBroker limit
***/
Message<byte[]> msg = null;
- Set<MessageId> messages1 = Sets.newHashSet();
+ Set<MessageId> messages1 = new HashSet<>();
for (int j = 0; j < totalProducedMsgs; j++) {
msg = consumer1Sub1.receive(waitMills, TimeUnit.MILLISECONDS);
if (msg != null) {
@@ -793,7 +794,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
ConsumerImpl<byte[]> consumerSub2 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriberName2).receiverQueueSize(receiverQueueSize)
.subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
- Set<MessageId> messages2 = Sets.newHashSet();
+ Set<MessageId> messages2 = new HashSet<>();
for (int j = 0; j < totalProducedMsgs; j++) {
msg = consumerSub2.receive(waitMills, TimeUnit.MILLISECONDS);
if (msg != null) {
@@ -954,7 +955,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
* maxUnAckPerBroker limit
***/
Message<?> msg = null;
- Set<MessageId> messages1 = Sets.newHashSet();
+ Set<MessageId> messages1 = new HashSet<>();
for (int j = 0; j < totalProducedMsgs; j++) {
msg = consumer1Sub1.receive(100, TimeUnit.MILLISECONDS);
if (msg != null) {
@@ -1006,7 +1007,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscriptionName(subscriberName2).receiverQueueSize(receiverQueueSize)
.subscriptionType(SubscriptionType.Shared).subscribe();
- Set<MessageId> messages2 = Sets.newHashSet();
+ Set<MessageId> messages2 = new HashSet<>();
for (int j = 0; j < totalProducedMsgs; j++) {
msg = consumer1Sub2.receive(100, TimeUnit.MILLISECONDS);
if (msg != null) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 8742b8798b4..e2198deb294 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -24,7 +24,6 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-import com.google.common.collect.Sets;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
@@ -32,6 +31,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -1320,7 +1320,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
.compareTo(lastMessageForKey.get(key).getValue()) > 0);
}
lastMessageForKey.put(key, message);
- consumerKeys.putIfAbsent(check.getKey(), Sets.newHashSet());
+ consumerKeys.putIfAbsent(check.getKey(), new HashSet<>());
consumerKeys.get(check.getKey()).add(key);
received++;
}
@@ -1353,7 +1353,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
Assert.assertNull(noMessages, "redeliver too many messages.");
Assert.assertEquals((check.getValue() + redeliveryCount), received);
}
- Set<String> allKeys = Sets.newHashSet();
+ Set<String> allKeys = new HashSet<>();
consumerKeys.forEach((k, v) -> v.forEach(key -> {
assertTrue(allKeys.add(key),
"Key "+ key + "is distributed to multiple consumers." );
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index 23b4ea4f381..d024d43eb73 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -25,12 +25,7 @@ import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-
-import lombok.Cleanup;
-
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashMap;
@@ -42,7 +37,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-
+import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.cache.PendingReadsManager;
@@ -158,7 +153,7 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
Assert.assertTrue(isDispatchRateUpdate);
Assert.assertEquals(admin.namespaces().getDispatchRate(namespace), dispatchRate);
Policies policies = admin.namespaces().getPolicies(namespace);
- Map<String, DispatchRate> dispatchRateMap = Maps.newHashMap();
+ Map<String, DispatchRate> dispatchRateMap = new HashMap<>();
dispatchRateMap.put("test", dispatchRate);
Assert.assertEquals(policies.clusterDispatchRate, dispatchRateMap);
Assert.assertEquals(policies.topicDispatchRate, dispatchRateMap);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
index 0dc5a1f6c44..6dc901d53e2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
@@ -227,7 +227,7 @@ public class MutualAuthenticationTest extends ProducerConsumerBase {
producer.send(message.getBytes());
}
Message<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 67bc7c0a309..677cca30c7b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -29,6 +29,7 @@ import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.net.URL;
+import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@@ -175,7 +176,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
producer.flush();
Message<?> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < totalProduceMsg; i++) {
msg = consumer.receive(1, TimeUnit.SECONDS);
if (msg != null) {
@@ -218,7 +219,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
producer.flush();
Message<?> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < totalProduceMsg; i++) {
msg = consumer.receive(1, TimeUnit.SECONDS);
if (msg != null) {
@@ -274,7 +275,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
producer.flush();
Message<?> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < totalProduceMsg; i++) {
msg = consumer.receive(1, TimeUnit.SECONDS);
if (msg != null) {
@@ -316,7 +317,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
producer.flush();
Message<?> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < totalProduceMsg; i++) {
msg = consumer.receive(1, TimeUnit.SECONDS);
if (msg != null) {
@@ -373,7 +374,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
latch.await();
Message<?> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < totalProduceMessages; i++) {
msg = consumer.receive(500, TimeUnit.MILLISECONDS);
if (msg != null) {
@@ -427,7 +428,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
// consume from shared-subscriptions
Message<?> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < totalProduceMsg; i++) {
msg = consumer1Shared.receive(500, TimeUnit.MILLISECONDS);
if (msg != null) {
@@ -598,7 +599,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
// (1) consume by consumer1
Message<?> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < totalProducedMessages; i++) {
msg = consumer1.receive(300, TimeUnit.MILLISECONDS);
if (msg != null) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index 5f229dd580e..472f08fa962 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -22,10 +22,11 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
import io.netty.util.Timeout;
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -35,7 +36,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
@@ -50,10 +50,6 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import com.google.common.collect.Sets;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
-
@Test(groups = "broker-api")
public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(PartitionedProducerConsumerTest.class);
@@ -100,7 +96,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
}
Message<byte[]> msg;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(msg, "Message should not be null");
@@ -166,7 +162,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
}
Message<byte[]> msg;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < MESSAGE_COUNT; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
@@ -211,7 +207,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
}
Message<byte[]> msg;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
@@ -258,7 +254,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
producer.newMessage().key(dummyKey2).value(message.getBytes()).send();
}
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(msg, "Message should not be null");
@@ -519,8 +515,8 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Starting {} test --", methodName);
final int totalMsg = 100;
- final Set<String> produceMsgs = Sets.newHashSet();
- final Set<String> consumeMsgs = Sets.newHashSet();
+ final Set<String> produceMsgs = new HashSet<>();
+ final Set<String> consumeMsgs = new HashSet<>();
int numPartitions = 4;
TopicName topicName = TopicName
@@ -569,8 +565,8 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
final int totalMsg = 100;
- final Set<String> produceMsgs = Sets.newHashSet();
- final Set<String> consumeMsgs = Sets.newHashSet();
+ final Set<String> produceMsgs = new HashSet<>();
+ final Set<String> consumeMsgs = new HashSet<>();
int numPartitions = 4;
TopicName topicName = TopicName
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternMultiTopicsConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternMultiTopicsConsumerTest.java
index a78b804b9b1..dc8cecdadc4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternMultiTopicsConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternMultiTopicsConsumerTest.java
@@ -18,17 +18,17 @@
*/
package org.apache.pulsar.client.api;
-import com.google.common.collect.Maps;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
@Test(groups = "broker")
public class PatternMultiTopicsConsumerTest extends ProducerConsumerBase {
@@ -71,7 +71,7 @@ public class PatternMultiTopicsConsumerTest extends ProducerConsumerBase {
}
private void testWithConsumer(Consumer<byte[]> consumer) throws Exception {
- Map<String, List<String>> sentMessages = Maps.newHashMap();
+ Map<String, List<String>> sentMessages = new HashMap<>();
for (int p = 0; p < 10; ++p) {
String name = "persistent://my-property/my-ns/topic-" + p;
Producer<byte[]> producer = pulsarClient.newProducer().topic(name).create();
@@ -83,7 +83,7 @@ public class PatternMultiTopicsConsumerTest extends ProducerConsumerBase {
}
Message<byte[]> msg;
- Map<String, List<String>> receivedMessages = Maps.newHashMap();
+ Map<String, List<String>> receivedMessages = new HashMap<>();
for (int i = 0; i < 100; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
index fecf545a678..5f02219aac3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
@@ -21,9 +21,9 @@ package org.apache.pulsar.client.api;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.fail;
-import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -158,14 +158,14 @@ public class RetryTopicTest extends ProducerConsumerBase {
.topic(topic)
.create();
- Set<String> originMessageIds = Sets.newHashSet();
+ Set<String> originMessageIds = new HashSet<>();
for (int i = 0; i < sendMessages; i++) {
MessageId msgId = producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
originMessageIds.add(msgId.toString());
}
int totalReceived = 0;
- Set<String> retryMessageIds = Sets.newHashSet();
+ Set<String> retryMessageIds = new HashSet<>();
do {
Message<byte[]> message = consumer.receive();
log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
@@ -183,7 +183,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
assertEquals(retryMessageIds, originMessageIds);
int totalInDeadLetter = 0;
- Set<String> deadLetterMessageIds = Sets.newHashSet();
+ Set<String> deadLetterMessageIds = new HashSet<>();
do {
Message message = deadLetterConsumer.receive();
log.info("dead letter consumer received message : {} {}", message.getMessageId(),
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index 40d34a7c0b7..7cfa35c9d1c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -23,7 +23,12 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -32,7 +37,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-
+import lombok.Cleanup;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.awaitility.Awaitility;
@@ -43,15 +48,6 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.RateLimiter;
-import com.google.gson.Gson;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
-
-import lombok.Cleanup;
-
@Test(groups = "broker-api")
public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerStatTest.class);
@@ -114,7 +110,7 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
}
Message<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < numMessages; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
@@ -153,7 +149,7 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
}
Producer<byte[]> producer = producerBuilder.create();
- List<Future<MessageId>> futures = Lists.newArrayList();
+ List<Future<MessageId>> futures = new ArrayList<>();
int numMessages = 50;
// Asynchronously produce messages
@@ -169,7 +165,7 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
}
Message<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < numMessages; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
@@ -211,7 +207,7 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
}
Producer<byte[]> producer = producerBuilder.create();
- List<Future<MessageId>> futures = Lists.newArrayList();
+ List<Future<MessageId>> futures = new ArrayList<>();
int numMessages = 101;
// Asynchronously produce messages
@@ -227,7 +223,7 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
}
Message<byte[]> msg = null;
CompletableFuture<Message<byte[]>> future_msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < numMessages; i++) {
future_msg = consumer.receiveAsync();
Thread.sleep(10);
@@ -274,7 +270,7 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
}
Producer<byte[]> producer = producerBuilder.create();
- List<Future<MessageId>> futures = Lists.newArrayList();
+ List<Future<MessageId>> futures = new ArrayList<>();
// Asynchronously produce messages
for (int i = 0; i < numMessages; i++) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 669161f67c0..08ca6e80a22 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -36,7 +36,6 @@ import static org.testng.Assert.fail;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -50,6 +49,7 @@ import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -336,7 +336,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
}
Message<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
@@ -366,7 +366,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
producerBuilder.batchingMaxMessages(5);
}
Producer<byte[]> producer = producerBuilder.create();
- List<Future<MessageId>> futures = Lists.newArrayList();
+ List<Future<MessageId>> futures = new ArrayList<>();
// Asynchronously produce messages
for (int i = 0; i < 10; i++) {
@@ -381,7 +381,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
}
Message<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
@@ -423,7 +423,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
producerBuilder.batchingMaxMessages(5);
}
Producer<byte[]> producer = producerBuilder.create();
- List<Future<MessageId>> futures = Lists.newArrayList();
+ List<Future<MessageId>> futures = new ArrayList<>();
// Asynchronously produce messages
for (int i = 0; i < numMessages; i++) {
@@ -1206,7 +1206,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
Thread.sleep(100);
// 5. verify: active subscribers
- Set<String> activeSubscriber = Sets.newHashSet();
+ Set<String> activeSubscriber = new HashSet<>();
ledger.getActiveCursors().forEach(c -> activeSubscriber.add(c.getName()));
assertTrue(activeSubscriber.contains(sub1));
assertFalse(activeSubscriber.contains(sub2));
@@ -1231,8 +1231,8 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Starting {} test --", methodName);
final int totalMsg = 100;
- final Set<String> produceMsgs = Sets.newHashSet();
- final Set<String> consumeMsgs = Sets.newHashSet();
+ final Set<String> produceMsgs = new HashSet<>();
+ final Set<String> consumeMsgs = new HashSet<>();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
.subscriptionName("my-subscriber-name").subscribe();
@@ -1271,8 +1271,8 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Starting {} test --", methodName);
final int totalMsg = 100;
- final Set<String> produceMsgs = Sets.newHashSet();
- final Set<String> consumeMsgs = Sets.newHashSet();
+ final Set<String> produceMsgs = new HashSet<>();
+ final Set<String> consumeMsgs = new HashSet<>();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
.subscriptionName("my-subscriber-name").subscribe();
@@ -1399,8 +1399,8 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
}
Message<byte[]> msg;
- Set<Message<byte[]>> consumerMsgSet1 = Sets.newHashSet();
- Set<Message<byte[]>> consumerMsgSet2 = Sets.newHashSet();
+ Set<Message<byte[]>> consumerMsgSet1 = new HashSet<>();
+ Set<Message<byte[]>> consumerMsgSet2 = new HashSet<>();
for (int i = 0; i < 5; i++) {
msg = consumer1.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
consumerMsgSet1.add(msg);
@@ -1504,7 +1504,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
// (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
Message<byte[]> msg;
- List<Message<byte[]>> messages = Lists.newArrayList();
+ List<Message<byte[]>> messages = new ArrayList<>();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
@@ -1583,7 +1583,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
for (int j = 0; j < totalReceiveIteration; j++) {
Message<byte[]> msg;
- List<Message<byte[]>> messages = Lists.newArrayList();
+ List<Message<byte[]>> messages = new ArrayList<>();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
@@ -1701,7 +1701,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
// (2) Consumer1: consume without ack:
// try to consume messages: but will be able to consume number of messages = maxUnackedMessages
Message<byte[]> msg;
- List<Message<byte[]>> messages = Lists.newArrayList();
+ List<Message<byte[]>> messages = new ArrayList<>();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer1.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
@@ -1848,7 +1848,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
// (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
Message<byte[]> msg;
- List<Message<byte[]>> messages = Lists.newArrayList();
+ List<Message<byte[]>> messages = new ArrayList<>();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
@@ -1918,7 +1918,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
Producer<byte[]> producer = producerBuidler.create();
- List<CompletableFuture<MessageId>> futures = Lists.newArrayList();
+ List<CompletableFuture<MessageId>> futures = new ArrayList<>();
// (1) Produced Messages
for (int i = 0; i < totalProducedMsgs; i++) {
String message = "my-message-" + i;
@@ -1930,7 +1930,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
// (2) Consumer1: consume without ack:
// try to consume messages: but will be able to consume number of messages = maxUnackedMessages
Message<byte[]> msg;
- List<Message<byte[]>> messages = Lists.newArrayList();
+ List<Message<byte[]>> messages = new ArrayList<>();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer1.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
@@ -2012,7 +2012,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
// (2) Consumer1: consume without ack:
// try to consume messages: but will be able to consume number of messages = maxUnackedMessages
Message<byte[]> msg;
- List<Message<byte[]>> messages = Lists.newArrayList();
+ List<Message<byte[]>> messages = new ArrayList<>();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer1.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
@@ -2091,7 +2091,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
}
Message<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < totalMsg; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
@@ -2139,7 +2139,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
// (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
Message<byte[]> msg;
- List<Message<byte[]>> messages1 = Lists.newArrayList();
+ List<Message<byte[]>> messages1 = new ArrayList<>();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
@@ -2159,7 +2159,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
consumer.redeliverUnacknowledgedMessages(Sets.newHashSet(redeliveryMessages));
Thread.sleep(1000);
- Set<MessageIdImpl> messages2 = Sets.newHashSet();
+ Set<MessageIdImpl> messages2 = new HashSet<>();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
@@ -2226,7 +2226,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
// (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
Message<byte[]> msg;
- List<Message<byte[]>> messages1 = Lists.newArrayList();
+ List<Message<byte[]>> messages1 = new ArrayList<>();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
@@ -2245,7 +2245,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
consumer.redeliverUnacknowledgedMessages(Sets.newHashSet(redeliveryMessages));
Thread.sleep(1000);
- Set<MessageIdImpl> messages2 = Sets.newHashSet();
+ Set<MessageIdImpl> messages2 = new HashSet<>();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg != null) {
@@ -2297,7 +2297,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic2")
.create();
- List<Future<MessageId>> futures = Lists.newArrayList();
+ List<Future<MessageId>> futures = new ArrayList<>();
// Asynchronously produce messages
for (int i = 0; i < 15; i++) {
@@ -2375,7 +2375,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
.topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name")
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(queueSize)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
- List<Future<MessageId>> futures = Lists.newArrayList();
+ List<Future<MessageId>> futures = new ArrayList<>();
// Asynchronously produce messages
final int totalPublishMessages = 500;
@@ -2390,7 +2390,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
future.get();
}
- List<Message<byte[]>> messages = Lists.newArrayList();
+ List<Message<byte[]>> messages = new ArrayList<>();
// let consumer1 and consumer2 consume messages up to the queue will be full
for (int i = 0; i < totalPublishMessages; i++) {
@@ -2503,7 +2503,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
producer.flush();
// (1.a) consume first consumeMsgInParts msgs and trigger redeliver
Message<byte[]> msg;
- List<Message<byte[]>> messages1 = Lists.newArrayList();
+ List<Message<byte[]>> messages1 = new ArrayList<>();
for (int i = 0; i < consumeMsgInParts; i++) {
//There is some detailed info about this case.
//https://github.com/apache/pulsar/pull/15088#issuecomment-1113521990
@@ -2635,7 +2635,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
final int totalMsg = 10;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
Consumer<byte[]> cryptoConsumer = pulsarClient.newConsumer()
.topic(topicName).subscriptionName("my-subscriber-name")
@@ -2717,7 +2717,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
final int totalMsg = 10;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
.subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()).subscribe();
Consumer<byte[]> normalConsumer = pulsarClient.newConsumer()
@@ -2805,10 +2805,10 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
final String rsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQ [...]
final int numMsg = 10;
- Map<String, String> privateKeyFileMap = Maps.newHashMap();
+ Map<String, String> privateKeyFileMap = new HashMap<>();
privateKeyFileMap.put("client-ecdsa.pem", ecdsaPrivateKeyFile);
privateKeyFileMap.put("client-rsa.pem", rsaPrivateKeyFile);
- Map<String, String> privateKeyDataMap = Maps.newHashMap();
+ Map<String, String> privateKeyDataMap = new HashMap<>();
privateKeyDataMap.put("client-ecdsa.pem", ecdsaPrivateKeyData);
privateKeyDataMap.put("client-rsa.pem", rsaPrivateKeyData);
@@ -2894,7 +2894,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
final String encryptionKeyName = "client-rsa.pem";
final String encryptionKeyVersion = "1.0";
- Map<String, String> metadata = Maps.newHashMap();
+ Map<String, String> metadata = new HashMap<>();
metadata.put("version", encryptionKeyVersion);
class EncKeyReader implements CryptoKeyReader {
final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
@@ -3073,7 +3073,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
final int totalMsg = 10;
MessageImpl<byte[]> msg;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-property/use/myenc-ns/myenc-topic1").subscriptionName("my-subscriber-name")
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
@@ -3166,7 +3166,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
final String encryptionKeyName = "client-rsa.pem";
final String encryptionKeyVersion = "1.0";
- Map<String, String> metadata = Maps.newHashMap();
+ Map<String, String> metadata = new HashMap<>();
metadata.put("version", encryptionKeyVersion);
class EncKeyReader implements CryptoKeyReader {
final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
@@ -3505,7 +3505,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
}
Message<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
@@ -3541,7 +3541,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
}
Message<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
@@ -3651,7 +3651,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
Integer evenDistributionCount = noOfPartitions / 3;
retryStrategically((test) -> {
try {
- Map<String, Integer> subsCount = Maps.newHashMap();
+ Map<String, Integer> subsCount = new HashMap<>();
admin.topics().getPartitionedStats(topicName, true).getPartitions().forEach((p, stats) -> {
String activeConsumerName = stats.getSubscriptions().entrySet().iterator().next()
.getValue().getActiveConsumerName();
@@ -3667,7 +3667,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
return false;
}, 5, 100);
- Map<String, Integer> subsCount = Maps.newHashMap();
+ Map<String, Integer> subsCount = new HashMap<>();
admin.topics().getPartitionedStats(topicName, true).getPartitions().forEach((p, stats) -> {
String activeConsumerName = stats.getSubscriptions().entrySet().iterator().next().getValue().getActiveConsumerName();
subsCount.compute(activeConsumerName, (k, v) -> v != null ? v + 1 : 1);
@@ -3810,7 +3810,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
Consumer<byte[]> consumer = consumerBuilder.subscriptionName("my-subscriber-name").subscribe();
consumer.seek(resetPos.get());
log.info("reset cursor to {}", resetPos.get());
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = firstMessage; i < numOfMessages; i++) {
Message<byte[]> message = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
String receivedMessage = new String(message.getData());
@@ -3843,7 +3843,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
.topic("persistent://my-property/my-ns/my-topic2");
Producer<byte[]> producer = producerBuilder.create();
- List<Future<MessageId>> futures = Lists.newArrayList();
+ List<Future<MessageId>> futures = new ArrayList<>();
// Asynchronously produce messages
byte[] message = new byte[ClientCnx.getMaxMessageSize() + 1];
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
index dc6041186bb..c5a7de1abec 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
@@ -23,10 +23,10 @@ import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import java.time.Clock;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -91,7 +91,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
}
Message<JsonEncodedPojo> msg = null;
- Set<JsonEncodedPojo> messageSet = Sets.newHashSet();
+ Set<JsonEncodedPojo> messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
JsonEncodedPojo receivedMessage = msg.getValue();
@@ -207,7 +207,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
}
Message<org.apache.pulsar.client.api.schema.proto.Test.TestMessage> msg = null;
- Set<org.apache.pulsar.client.api.schema.proto.Test.TestMessage> messageSet = Sets.newHashSet();
+ Set<org.apache.pulsar.client.api.schema.proto.Test.TestMessage> messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
org.apache.pulsar.client.api.schema.proto.Test.TestMessage receivedMessage = msg.getValue();
@@ -287,7 +287,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
}
Message<AvroEncodedPojo> msg = null;
- Set<AvroEncodedPojo> messageSet = Sets.newHashSet();
+ Set<AvroEncodedPojo> messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
AvroEncodedPojo receivedMessage = msg.getValue();
@@ -457,7 +457,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
.subscribe();
Message<GenericRecord> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
GenericRecord receivedMessage = msg.getValue();
@@ -505,7 +505,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
.create();
Message<GenericRecord> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = reader.readNext();
GenericRecord receivedMessage = msg.getValue();
@@ -578,7 +578,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
.subscribe();
Message<GenericRecord> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < 20; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
GenericRecord receivedMessage = msg.getValue();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
index 73d89b2356c..3170a101ff8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
@@ -138,7 +138,7 @@ public class TokenAuthenticatedProducerConsumerTest extends ProducerConsumerBase
}
Message<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
index 5d2e473d582..cc1bd8cbe59 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
@@ -132,7 +132,7 @@ public class TokenOauth2AuthenticatedProducerConsumerTest extends ProducerConsum
}
Message<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
@@ -184,7 +184,7 @@ public class TokenOauth2AuthenticatedProducerConsumerTest extends ProducerConsum
}
Message<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
@@ -221,7 +221,7 @@ public class TokenOauth2AuthenticatedProducerConsumerTest extends ProducerConsum
}
msg = null;
- messageSet = Sets.newHashSet();
+ messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index 72252309e65..618cd1aa1aa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -24,13 +24,12 @@ import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -40,9 +39,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-
import lombok.Cleanup;
-
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
@@ -129,7 +126,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
}
Message<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = reader.readNext(1, TimeUnit.SECONDS);
@@ -160,7 +157,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
}
Message<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = reader.readNext(1, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
@@ -185,7 +182,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
.startMessageId(MessageId.earliest).create();
Message<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = reader.readNext(1, TimeUnit.SECONDS);
@@ -215,7 +212,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
.startMessageId(MessageId.earliest).create();
Message<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = reader.readNext(1, TimeUnit.SECONDS);
@@ -243,7 +240,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
.startMessageId(MessageId.earliest).create();
Message<byte[]> msg = null;
- Set<String> messageSet1 = Sets.newHashSet();
+ Set<String> messageSet1 = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = reader1.readNext(1, TimeUnit.SECONDS);
@@ -253,7 +250,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
testMessageOrderAndDuplicates(messageSet1, receivedMessage, expectedMessage);
}
- Set<String> messageSet2 = Sets.newHashSet();
+ Set<String> messageSet2 = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = reader2.readNext(1, TimeUnit.SECONDS);
@@ -286,14 +283,14 @@ public class TopicReaderTest extends ProducerConsumerBase {
.startMessageId(MessageId.earliest).create();
Message<byte[]> msg = null;
- Set<String> messageSet1 = Sets.newHashSet();
+ Set<String> messageSet1 = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = reader1.readNext(1, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
assertTrue(messageSet1.add(receivedMessage));
}
- Set<String> messageSet2 = Sets.newHashSet();
+ Set<String> messageSet2 = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = reader2.readNext(1, TimeUnit.SECONDS);
@@ -377,7 +374,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
}
// Publish more messages and verify the readers only sees new messages
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = halfOfMsgs; i < numOfMessages; i++) {
Message<byte[]> message = reader.readNext();
String receivedMessage = new String(message.getData());
@@ -426,7 +423,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
}
// Publish more messages and verify the readers only sees new messages
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = halfOfMsgs; i < numOfMessages; i++) {
Message<byte[]> message = reader.readNext();
assertFalse(oldMessage.contains(message));
@@ -457,7 +454,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
// Publish more messages and verify the readers only sees messages starting from the intended message
Message<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 5; i < 10; i++) {
msg = reader.readNext(1, TimeUnit.SECONDS);
@@ -557,7 +554,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
final int totalMsg = 10;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
Reader<byte[]> reader = pulsarClient.newReader()
.topic("persistent://my-property/my-ns/test-reader-myecdsa-topic1").startMessageId(MessageId.latest)
.cryptoKeyReader(new EncKeyReader()).create();
@@ -627,7 +624,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
final int totalMsg = 10;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
String topic = "persistent://my-property/my-ns/test-multi-reader-myecdsa-topic1";
admin.topics().createPartitionedTopic(topic, 3);
Reader<byte[]> reader = pulsarClient.newReader()
@@ -667,10 +664,10 @@ public class TopicReaderTest extends ProducerConsumerBase {
final String rsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQ [...]
final int numMsg = 10;
- Map<String, String> privateKeyFileMap = Maps.newHashMap();
+ Map<String, String> privateKeyFileMap = new HashMap<>();
privateKeyFileMap.put("client-ecdsa.pem", ecdsaPrivateKeyFile);
privateKeyFileMap.put("client-rsa.pem", rsaPrivateKeyFile);
- Map<String, String> privateKeyDataMap = Maps.newHashMap();
+ Map<String, String> privateKeyDataMap = new HashMap<>();
privateKeyDataMap.put("client-ecdsa.pem", ecdsaPrivateKeyData);
privateKeyDataMap.put("client-rsa.pem", rsaPrivateKeyData);
@@ -759,7 +756,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
}
MessageImpl<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
int index = 0;
// read message till end.
@@ -815,7 +812,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
}
TopicMessageImpl<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
int index = 0;
// read message till end.
@@ -1214,7 +1211,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
assertTrue(reader.hasMessageAvailable());
// Read all messages the first time
- Set<String> messageSetA = Sets.newHashSet();
+ Set<String> messageSetA = new HashSet<>();
for (int i = 0; i < numOfMessage; i++) {
Message<byte[]> message = reader.readNext();
String receivedMessage = new String(message.getData());
@@ -1228,7 +1225,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
reader.seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m"));
// Read all messages a second time after seek()
- Set<String> messageSetB = Sets.newHashSet();
+ Set<String> messageSetB = new HashSet<>();
for (int i = 0; i < numOfMessage; i++) {
Message<byte[]> message = reader.readNext();
String receivedMessage = new String(message.getData());
@@ -1262,7 +1259,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
assertTrue(reader.hasMessageAvailable());
// Read all messages the first time
- Set<String> messageSetA = Sets.newHashSet();
+ Set<String> messageSetA = new HashSet<>();
for (int i = 0; i < numOfMessage; i++) {
Message<byte[]> message = reader.readNext();
String receivedMessage = new String(message.getData());
@@ -1275,7 +1272,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
reader.seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m"));
// Read all messages a second time after seek()
- Set<String> messageSetB = Sets.newHashSet();
+ Set<String> messageSetB = new HashSet<>();
for (int i = 0; i < numOfMessage; i++) {
Message<byte[]> message = reader.readNext();
String receivedMessage = new String(message.getData());
@@ -1311,7 +1308,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
// Read all messages the first time
MessageId midmessageToSeek = null;
- Set<String> messageSetA = Sets.newHashSet();
+ Set<String> messageSetA = new HashSet<>();
for (int i = 0; i < numOfMessage; i++) {
Message<byte[]> message = reader.readNext();
String receivedMessage = new String(message.getData());
@@ -1329,7 +1326,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
reader.seek(midmessageToSeek);
// Read all halved messages after seek()
- Set<String> messageSetB = Sets.newHashSet();
+ Set<String> messageSetB = new HashSet<>();
for (int i = halfMessages + 1; i < numOfMessage; i++) {
Message<byte[]> message = reader.readNext();
String receivedMessage = new String(message.getData());
@@ -1367,7 +1364,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
int plusTime = (halfMessages + 1) * 100;
reader.seek(l + plusTime);
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = halfMessages + 1; i < numOfMessage; i++) {
Message<byte[]> message = reader.readNext();
String receivedMessage = new String(message.getData());
@@ -1397,7 +1394,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
Reader<byte[]> reader = pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create();
reader.seek(halfTime);
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = halfMessages + 1; i < numOfMessage; i++) {
Message<byte[]> message = reader.readNext(10, TimeUnit.SECONDS);
String receivedMessage = new String(message.getData());
@@ -1452,7 +1449,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
}
Reader<byte[]> reader = readerBuilder.create();
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = firstMessage; i < numOfMessages; i++) {
Message<byte[]> message = reader.readNext();
String receivedMessage = new String(message.getData());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index 4f9c9080646..097cf4823de 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -27,11 +27,12 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -138,7 +139,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
}
Message<String> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = msg.getValue();
@@ -168,7 +169,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
.enableBatching(batchMessageDelayMs != 0)
.create();
- List<Future<MessageId>> futures = Lists.newArrayList();
+ List<Future<MessageId>> futures = new ArrayList<>();
// Asynchronously produce messages
for (int i = 0; i < 10; i++) {
@@ -183,7 +184,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
}
Message<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
@@ -226,7 +227,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
.enableBatching(batchMessageDelayMs != 0)
.create();
- List<Future<MessageId>> futures = Lists.newArrayList();
+ List<Future<MessageId>> futures = new ArrayList<>();
// Asynchronously produce messages
for (int i = 0; i < numMessages; i++) {
@@ -740,8 +741,8 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
log.info("-- Starting {} test --", methodName);
final int totalMsg = 100;
- final Set<String> produceMsgs = Sets.newHashSet();
- final Set<String> consumeMsgs = Sets.newHashSet();
+ final Set<String> produceMsgs = new HashSet<>();
+ final Set<String> consumeMsgs = new HashSet<>();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic1")
@@ -784,8 +785,8 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
log.info("-- Starting {} test --", methodName);
final int totalMsg = 100;
- final Set<String> produceMsgs = Sets.newHashSet();
- final Set<String> consumeMsgs = Sets.newHashSet();
+ final Set<String> produceMsgs = new HashSet<>();
+ final Set<String> consumeMsgs = new HashSet<>();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic1")
@@ -877,8 +878,8 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
}
Message<byte[]>msg = null;
- Set<Message<byte[]>> consumerMsgSet1 = Sets.newHashSet();
- Set<Message<byte[]>> consumerMsgSet2 = Sets.newHashSet();
+ Set<Message<byte[]>> consumerMsgSet1 = new HashSet<>();
+ Set<Message<byte[]>> consumerMsgSet2 = new HashSet<>();
for (int i = 0; i < 5; i++) {
msg = consumer1.receive();
consumerMsgSet1.add(msg);
@@ -985,7 +986,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
// (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
Message<byte[]>msg = null;
- List<Message<byte[]>> messages = Lists.newArrayList();
+ List<Message<byte[]>> messages = new ArrayList<>();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer.receive(1, TimeUnit.SECONDS);
if (msg != null) {
@@ -1073,7 +1074,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
for (int j = 0; j < totalReceiveIteration; j++) {
Message<byte[]>msg = null;
- List<Message<byte[]>> messages = Lists.newArrayList();
+ List<Message<byte[]>> messages = new ArrayList<>();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer.receive(1, TimeUnit.SECONDS);
if (msg != null) {
@@ -1155,7 +1156,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
// (2) Consumer1: consume without ack:
// try to consume messages: but will be able to consume number of messages = maxUnackedMessages
Message<byte[]>msg = null;
- List<Message<byte[]>> messages = Lists.newArrayList();
+ List<Message<byte[]>> messages = new ArrayList<>();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer1.receive(1, TimeUnit.SECONDS);
if (msg != null) {
@@ -1309,7 +1310,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
// (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
Message<byte[]>msg = null;
- List<Message<byte[]>> messages = Lists.newArrayList();
+ List<Message<byte[]>> messages = new ArrayList<>();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer.receive(1, TimeUnit.SECONDS);
if (msg != null) {
@@ -1375,7 +1376,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
.batchingMaxMessages(5)
.create();
- List<CompletableFuture<MessageId>> futures = Lists.newArrayList();
+ List<CompletableFuture<MessageId>> futures = new ArrayList<>();
// (1) Produced Messages
for (int i = 0; i < totalProducedMsgs; i++) {
String message = "my-message-" + i;
@@ -1387,7 +1388,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
// (2) Consumer1: consume without ack:
// try to consume messages: but will be able to consume number of messages = maxUnackedMessages
Message<byte[]>msg = null;
- List<Message<byte[]>> messages = Lists.newArrayList();
+ List<Message<byte[]>> messages = new ArrayList<>();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer1.receive(1, TimeUnit.SECONDS);
if (msg != null) {
@@ -1473,7 +1474,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
// (2) Consumer1: consume without ack:
// try to consume messages: but will be able to consume number of messages = maxUnackedMessages
Message<byte[]>msg = null;
- List<Message<byte[]>> messages = Lists.newArrayList();
+ List<Message<byte[]>> messages = new ArrayList<>();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer1.receive(1, TimeUnit.SECONDS);
if (msg != null) {
@@ -1556,7 +1557,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
}
Message<byte[]>msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < totalMsg; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
@@ -1609,7 +1610,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
// (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
Message<byte[]>msg = null;
- List<Message<byte[]>> messages1 = Lists.newArrayList();
+ List<Message<byte[]>> messages1 = new ArrayList<>();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer.receive(1, TimeUnit.SECONDS);
if (msg != null) {
@@ -1630,7 +1631,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
consumer.redeliverUnacknowledgedMessages(Sets.newHashSet(redeliveryMessages));
Thread.sleep(1000);
- Set<MessageIdImpl> messages2 = Sets.newHashSet();
+ Set<MessageIdImpl> messages2 = new HashSet<>();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer.receive(1, TimeUnit.SECONDS);
if (msg != null) {
@@ -1705,7 +1706,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
// (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
Message<byte[]>msg = null;
- List<Message<byte[]>> messages1 = Lists.newArrayList();
+ List<Message<byte[]>> messages1 = new ArrayList<>();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer.receive(1, TimeUnit.SECONDS);
if (msg != null) {
@@ -1726,7 +1727,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
consumer.redeliverUnacknowledgedMessages(Sets.newHashSet(redeliveryMessages));
Thread.sleep(1000);
- Set<MessageIdImpl> messages2 = Sets.newHashSet();
+ Set<MessageIdImpl> messages2 = new HashSet<>();
for (int i = 0; i < totalProducedMsgs; i++) {
msg = consumer.receive(1, TimeUnit.SECONDS);
if (msg != null) {
@@ -1782,7 +1783,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://my-property/use/my-ns/my-topic2")
.create();
- List<Future<MessageId>> futures = Lists.newArrayList();
+ List<Future<MessageId>> futures = new ArrayList<>();
// Asynchronously produce messages
for (int i = 0; i < 15; i++) {
@@ -1857,7 +1858,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
.topic("persistent://my-property/use/my-ns/my-topic2")
.enableBatching(false)
.create();
- List<Future<MessageId>> futures = Lists.newArrayList();
+ List<Future<MessageId>> futures = new ArrayList<>();
// Asynchronously produce messages
final int totalPublishMessages = 500;
@@ -1872,7 +1873,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
future.get();
}
- List<Message<byte[]>> messages = Lists.newArrayList();
+ List<Message<byte[]>> messages = new ArrayList<>();
// let consumer1 and consumer2 cosume messages up to the queue will be full
for (int i = 0; i < totalPublishMessages; i++) {
@@ -1985,7 +1986,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
}
// (1.a) consume first consumeMsgInParts msgs and trigger redeliver
Message<byte[]> msg = null;
- List<Message<byte[]>> messages1 = Lists.newArrayList();
+ List<Message<byte[]>> messages1 = new ArrayList<>();
for (int i = 0; i < consumeMsgInParts; i++) {
msg = consumer.receive(1, TimeUnit.SECONDS);
if (msg != null) {
@@ -2121,7 +2122,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
final int totalMsg = 10;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/myecdsa-topic1")
.subscriptionName("my-subscriber-name")
@@ -2197,7 +2198,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
final int totalMsg = 10;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/myrsa-topic1")
.subscriptionName("my-subscriber-name")
@@ -2279,7 +2280,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
final int totalMsg = 10;
Message<String> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic("persistent://my-property/use/my-ns/myenc-topic1")
.subscriptionName("my-subscriber-name")
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AdminApiKeyStoreTlsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AdminApiKeyStoreTlsAuthTest.java
index 22400e0ae64..04b05b36d0e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AdminApiKeyStoreTlsAuthTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AdminApiKeyStoreTlsAuthTest.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.client.impl;
import static org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls.mapToString;
import static org.testng.AssertJUnit.fail;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.jsonwebtoken.SignatureAlgorithm;
import java.util.HashMap;
@@ -172,9 +171,9 @@ public class AdminApiKeyStoreTlsAuthTest extends ProducerConsumerBase {
try (PulsarAdmin admin = buildAdminClient()) {
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
admin.tenants().createTenant("tenant1",
- new TenantInfoImpl(ImmutableSet.of("foobar"),
- ImmutableSet.of("test")));
- Assert.assertEquals(ImmutableSet.of("tenant1"), admin.tenants().getTenants());
+ new TenantInfoImpl(Set.of("foobar"),
+ Set.of("test")));
+ Assert.assertEquals(Set.of("tenant1"), admin.tenants().getTenants());
}
}
@@ -183,8 +182,8 @@ public class AdminApiKeyStoreTlsAuthTest extends ProducerConsumerBase {
try (PulsarAdmin admin = buildAdminClient()) {
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
admin.tenants().createTenant("tenant1",
- new TenantInfoImpl(ImmutableSet.of("proxy"),
- ImmutableSet.of("test")));
+ new TenantInfoImpl(Set.of("proxy"),
+ Set.of("test")));
admin.namespaces().createNamespace("tenant1/ns1");
Assert.assertTrue(admin.namespaces().getNamespaces("tenant1").contains("tenant1/ns1"));
}
@@ -195,12 +194,12 @@ public class AdminApiKeyStoreTlsAuthTest extends ProducerConsumerBase {
try (PulsarAdmin admin = buildAdminClient()) {
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
admin.tenants().createTenant("tenant1",
- new TenantInfoImpl(ImmutableSet.of("proxy", "user1"),
- ImmutableSet.of("test")));
+ new TenantInfoImpl(Set.of("proxy", "user1"),
+ Set.of("test")));
admin.namespaces().createNamespace("tenant1/ns1");
}
WebTarget root = buildWebClient();
- Assert.assertEquals(ImmutableSet.of("tenant1/ns1"),
+ Assert.assertEquals(Set.of("tenant1/ns1"),
root.path("/admin/v2/namespaces").path("tenant1")
.request(MediaType.APPLICATION_JSON)
.header("X-Original-Principal", "user1")
@@ -214,9 +213,9 @@ public class AdminApiKeyStoreTlsAuthTest extends ProducerConsumerBase {
try (PulsarAdmin admin = buildAdminClient()) {
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
admin.tenants().createTenant("tenant1",
- new TenantInfoImpl(ImmutableSet.of("foobar"),
- ImmutableSet.of("test")));
- Assert.assertEquals(ImmutableSet.of("tenant1"), admin.tenants().getTenants());
+ new TenantInfoImpl(Set.of("foobar"),
+ Set.of("test")));
+ Assert.assertEquals(Set.of("tenant1"), admin.tenants().getTenants());
admin.namespaces().createNamespace("tenant1/ns1");
@@ -259,7 +258,7 @@ public class AdminApiKeyStoreTlsAuthTest extends ProducerConsumerBase {
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
admin.tenants().createTenant("tenant1",
- new TenantInfoImpl(ImmutableSet.of("foobar"),
- ImmutableSet.of("test")));
+ new TenantInfoImpl(Set.of("foobar"),
+ Set.of("test")));
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index a8c3d353e84..e3af761429a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -37,7 +37,6 @@ import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import java.io.InputStream;
@@ -46,6 +45,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.NavigableMap;
import java.util.Optional;
@@ -367,7 +367,7 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
String message = "my-message-" + i;
lastNonBatchedMessageId = producer.send(message.getBytes());
}
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
Message<byte[]> msg = null;
for (int i = 0; i < numMessagesPerBatch; i++) {
msg = consumer1.receive(1, TimeUnit.SECONDS);
@@ -563,7 +563,7 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
ClientCnx cnx = producer.cnx();
assertTrue(cnx.channel().isActive());
- final List<CompletableFuture<Producer<byte[]>>> futures = Lists.newArrayList();
+ final List<CompletableFuture<Producer<byte[]>>> futures = new ArrayList<>();
final int totalProducers = 10;
CountDownLatch latch = new CountDownLatch(totalProducers);
for (int i = 0; i < totalProducers; i++) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
index 9fbea7f2914..3d8b18c52e6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
@@ -19,10 +19,10 @@
package org.apache.pulsar.client.impl;
import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
-import com.google.common.collect.Lists;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.IntStream;
@@ -63,7 +63,7 @@ public class ConnectionPoolTest extends MockedPulsarServiceBaseTest {
conf.setServiceUrl(serviceUrl);
PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool);
- List<InetSocketAddress> result = Lists.newArrayList();
+ List<InetSocketAddress> result = new ArrayList<>();
result.add(new InetSocketAddress("127.0.0.1", brokerPort));
Mockito.when(pool.resolveName(InetSocketAddress.createUnresolved("non-existing-dns-name",
brokerPort)))
@@ -83,7 +83,7 @@ public class ConnectionPoolTest extends MockedPulsarServiceBaseTest {
conf.setServiceUrl(serviceUrl);
PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool);
- List<InetSocketAddress> result = Lists.newArrayList();
+ List<InetSocketAddress> result = new ArrayList<>();
// Add a non existent IP to the response to check that we're trying the 2nd address as well
result.add(new InetSocketAddress("127.0.0.99", brokerPort));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
index 36d3bf6c440..d284853050e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
@@ -18,8 +18,17 @@
*/
package org.apache.pulsar.client.impl;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import lombok.Cleanup;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pulsar.client.api.Consumer;
@@ -35,15 +44,6 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
@Test(groups = "broker-impl")
public class KeySharedSubscriptionTest extends ProducerConsumerBase {
@@ -75,7 +75,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
.build();
final int totalMsg = 1000;
String topic = "broker-close-test-" + RandomStringUtils.randomAlphabetic(5);
- Map<Consumer<?>, List<MessageId>> nameToId = Maps.newConcurrentMap();
+ Map<Consumer<?>, List<MessageId>> nameToId = new ConcurrentHashMap<>();
Set<MessageId> pubMessages = Sets.newConcurrentHashSet();
Set<MessageId> recMessages = Sets.newConcurrentHashSet();
AtomicLong lastActiveTime = new AtomicLong();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index 783e971f391..2a7b8fe8d98 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -23,12 +23,12 @@ import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
@@ -137,7 +137,7 @@ public class MessageChunkingTest extends ProducerConsumerBase {
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
- List<String> publishedMessages = Lists.newArrayList();
+ List<String> publishedMessages = new ArrayList<>();
for (int i = 0; i < totalMessages; i++) {
String message = createMessagePayload(i * 100);
publishedMessages.add(message);
@@ -145,8 +145,8 @@ public class MessageChunkingTest extends ProducerConsumerBase {
}
Message<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
- List<MessageId> msgIds = Lists.newArrayList();
+ Set<String> messageSet = new HashSet<>();
+ List<MessageId> msgIds = new ArrayList<>();
for (int i = 0; i < totalMessages; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
@@ -227,7 +227,7 @@ public class MessageChunkingTest extends ProducerConsumerBase {
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
- List<String> publishedMessages = Lists.newArrayList();
+ List<String> publishedMessages = new ArrayList<>();
for (int i = 0; i < totalMessages; i++) {
String message = createMessagePayload(i * 100);
publishedMessages.add(message);
@@ -235,7 +235,7 @@ public class MessageChunkingTest extends ProducerConsumerBase {
}
Message<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < totalMessages; i++) {
msg = reader.readNext(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
@@ -328,7 +328,7 @@ public class MessageChunkingTest extends ProducerConsumerBase {
Producer<byte[]>[] producers = new Producer[totalProducers];
int totalPublishedMessages = totalProducers;
- List<CompletableFuture<MessageId>> futures = Lists.newArrayList();
+ List<CompletableFuture<MessageId>> futures = new ArrayList<>();
for (int i = 0; i < totalProducers; i++) {
producers[i] = producerBuilder.enableChunking(true).enableBatching(false).create();
int index = i;
@@ -341,7 +341,7 @@ public class MessageChunkingTest extends ProducerConsumerBase {
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
Message<byte[]> msg = null;
- Set<String> messageSet = Sets.newHashSet();
+ Set<String> messageSet = new HashSet<>();
for (int i = 0; i < totalMessages; i++) {
msg = consumer.receive(1, TimeUnit.SECONDS);
if (msg == null) {
@@ -501,7 +501,7 @@ public class MessageChunkingTest extends ProducerConsumerBase {
}
Message<byte[]> msg = null;
- List<MessageId> msgIds = Lists.newArrayList();
+ List<MessageId> msgIds = new ArrayList<>();
for (int i = 0; i < totalMessages; i++) {
msg = consumer1.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
index 95cfdd9f10c..92c2d767098 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
@@ -19,16 +19,12 @@
package org.apache.pulsar.client.impl;
import static org.testng.Assert.assertEquals;
-
-import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
-
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -38,12 +34,12 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.api.raw.MessageParser;
import org.apache.pulsar.common.api.raw.RawMessage;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.protocol.Commands;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -116,7 +112,7 @@ public class MessageParserTest extends MockedPulsarServiceBaseTest {
if (batchEnabled) {
Entry entry = cursor.readEntriesOrWait(1).get(0);
- List<RawMessage> messages = Lists.newArrayList();
+ List<RawMessage> messages = new ArrayList<>();
ByteBuf headsAndPayload = entry.getDataBuffer();
try {
@@ -147,8 +143,8 @@ public class MessageParserTest extends MockedPulsarServiceBaseTest {
List<Entry> entries = cursor.readEntriesOrWait(n);
assertEquals(entries.size(), n);
- List<ByteBuf> headsAndPayloadList = Lists.newArrayList();
- List<RawMessage> messages = Lists.newArrayList();
+ List<ByteBuf> headsAndPayloadList = new ArrayList<>();
+ List<RawMessage> messages = new ArrayList<>();
for (Entry entry : entries) {
ByteBuf headsAndPayload = entry.getDataBuffer();
headsAndPayloadList.add(headsAndPayload);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
index edb5f0cd88d..2974c39744f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
@@ -23,7 +23,6 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -259,7 +258,7 @@ public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest {
Reader<byte[]> reader = pulsarClient.newReader().topic(topic)
.startMessageFromRollbackDuration(2, TimeUnit.HOURS).create();
- List<MessageId> receivedMessageIds = Lists.newArrayList();
+ List<MessageId> receivedMessageIds = new ArrayList<>();
while (reader.hasMessageAvailable()) {
Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 53ae8f97008..9c6407ac712 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -23,7 +23,6 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
@@ -300,7 +299,7 @@ public class ReaderTest extends MockedPulsarServiceBaseTest {
Reader<byte[]> reader = pulsarClient.newReader().topic(topic)
.startMessageFromRollbackDuration(2, TimeUnit.HOURS).create();
- List<MessageId> receivedMessageIds = Lists.newArrayList();
+ List<MessageId> receivedMessageIds = new ArrayList<>();
while (reader.hasMessageAvailable()) {
Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 8d068d65114..d3702d04e06 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -271,7 +271,7 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
assertTrue(consumer instanceof MultiTopicsConsumerImpl);
// Asynchronously produce messages
- List<Future<MessageId>> futures = Lists.newArrayList();
+ List<Future<MessageId>> futures = new ArrayList<>();
for (int i = 0; i < totalMessages / 3; i++) {
futures.add(producer1.sendAsync((messagePredicate + "producer1-" + i).getBytes()));
futures.add(producer2.sendAsync((messagePredicate + "producer2-" + i).getBytes()));
@@ -697,7 +697,7 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
try {
pulsarClient.newConsumer()
- .topics(Lists.newArrayList())
+ .topics(new ArrayList<>())
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
index 8a66fa75353..73ebd56e2af 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.client.impl;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
-
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -34,9 +33,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-
-import com.google.common.collect.Lists;
-
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
@@ -156,7 +152,7 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
.create();
// 3. Create Consumer
- List<Message<byte[]>> messages = Lists.newArrayList();
+ List<Message<byte[]>> messages = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(totalMessages);
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriptionName).receiverQueueSize(0).messageListener((cons, msg) -> {