You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/09/17 09:27:45 UTC
[pulsar] branch master updated: [improve][test] remove powermock-reflect dependency (#17696)
This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 492c7df1579 [improve][test] remove powermock-reflect dependency (#17696)
492c7df1579 is described below
commit 492c7df1579c12701da6a77af1986d3aa7ace840
Author: tison <wa...@gmail.com>
AuthorDate: Sat Sep 17 17:27:34 2022 +0800
[improve][test] remove powermock-reflect dependency (#17696)
---
pom.xml | 13 ---
.../pulsar/broker/namespace/OwnershipCache.java | 6 +-
.../service/persistent/PersistentSubscription.java | 5 ++
.../broker/service/persistent/PersistentTopic.java | 5 ++
.../pendingack/impl/PendingAckHandleImpl.java | 6 ++
.../broker/BookKeeperClientFactoryImplTest.java | 8 +-
.../pulsar/broker/PulsarServiceCloseTest.java | 21 +++--
.../broker/admin/AdminApiGetLastMessageIdTest.java | 19 +----
.../pulsar/broker/admin/PersistentTopicsTest.java | 7 +-
.../pulsar/broker/admin/TopicAutoCreationTest.java | 28 +++----
.../org/apache/pulsar/broker/admin/TopicsTest.java | 4 +-
.../broker/loadbalance/LoadBalancerTest.java | 78 ++++++++---------
.../loadbalance/ModularLoadManagerImplTest.java | 30 ++-----
.../broker/namespace/NamespaceServiceTest.java | 9 +-
.../broker/service/ExclusiveProducerTest.java | 9 +-
.../pulsar/broker/service/TopicOwnerTest.java | 97 ++--------------------
.../broker/stats/ManagedCursorMetricsTest.java | 5 +-
.../systopic/PartitionedSystemTopicTest.java | 25 +++---
.../pulsar/broker/transaction/TransactionTest.java | 52 ++++++------
.../buffer/TopicTransactionBufferTest.java | 4 +-
.../buffer/TransactionLowWaterMarkTest.java | 12 +--
.../pendingack/PendingAckInMemoryDeleteTest.java | 34 ++++----
.../client/api/SimpleProducerConsumerTest.java | 24 +++++-
.../client/api/v1/V1_ProducerConsumerTest.java | 25 +++++-
.../pulsar/client/impl/NegativeAcksTest.java | 23 ++---
.../apache/pulsar/client/impl/ConsumerImpl.java | 6 ++
.../pulsar/client/impl/NegativeAcksTracker.java | 7 ++
.../pulsar/client/impl/PulsarClientImpl.java | 2 +-
.../pulsar/client/impl/schema/AvroSchemaTest.java | 2 -
29 files changed, 246 insertions(+), 320 deletions(-)
diff --git a/pom.xml b/pom.xml
index 5618e95b660..5f147915ff1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -241,7 +241,6 @@ flexible messaging model and an intuitive client API.</description>
<kerby.version>1.1.1</kerby.version>
<testng.version>7.3.0</testng.version>
<mockito.version>3.12.4</mockito.version>
- <powermock.version>2.0.9</powermock.version>
<javassist.version>3.25.0-GA</javassist.version>
<skyscreamer.version>1.5.0</skyscreamer.version>
<objenesis.version>3.1</objenesis.version>
@@ -340,12 +339,6 @@ flexible messaging model and an intuitive client API.</description>
<version>${mockito.version}</version>
</dependency>
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-reflect</artifactId>
- <version>${powermock.version}</version>
- </dependency>
-
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
@@ -1331,12 +1324,6 @@ flexible messaging model and an intuitive client API.</description>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-reflect</artifactId>
- <scope>test</scope>
- </dependency>
-
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index 67e986b804c..9c2030997fb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -106,7 +106,7 @@ public class OwnershipCache {
.thenRun(() -> {
log.info("Resource lock for {} has expired", rl.getPath());
namespaceService.unloadNamespaceBundle(namespaceBundle);
- ownedBundlesCache.synchronous().invalidate(namespaceBundle);
+ invalidateLocalOwnerCache(namespaceBundle);
namespaceService.onNamespaceBundleUnload(namespaceBundle);
});
return new OwnedBundle(namespaceBundle);
@@ -330,6 +330,10 @@ public class OwnershipCache {
this.ownedBundlesCache.synchronous().invalidateAll();
}
+ public void invalidateLocalOwnerCache(NamespaceBundle namespaceBundle) {
+ this.ownedBundlesCache.synchronous().invalidate(namespaceBundle);
+ }
+
public synchronized boolean refreshSelfOwnerInfo() {
this.selfOwnerInfo = new NamespaceEphemeralData(pulsar.getBrokerServiceUrl(),
pulsar.getBrokerServiceUrlTls(), pulsar.getSafeWebServiceAddress(),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 12359d5c413..e5d6251d177 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1281,6 +1281,11 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
return cursor;
}
+ @VisibleForTesting
+ public PendingAckHandle getPendingAckHandle() {
+ return pendingAckHandle;
+ }
+
public void syncBatchPositionBitSetForPendingAck(PositionImpl position) {
this.pendingAckHandle.syncBatchPositionAckSetForTransaction(position);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 0893ecf9ea8..484120cce24 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -382,6 +382,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
}
+ @VisibleForTesting
+ public AtomicLong getPendingWriteOps() {
+ return pendingWriteOps;
+ }
+
private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor,
boolean replicated, Map<String, String> subscriptionProperties) {
checkNotNull(compactedTopic);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 283bc038d76..a565a20b664 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.transaction.pendingack.impl;
import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet;
import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWithAckSet;
import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap;
+import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -1027,6 +1028,11 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
}
}
+ @VisibleForTesting
+ public Map<PositionImpl, MutablePair<PositionImpl, Integer>> getIndividualAckPositions() {
+ return individualAckPositions;
+ }
+
@Override
public boolean checkIfPendingAckStoreInit() {
return this.pendingAckStoreFuture != null && this.pendingAckStoreFuture.isDone();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java
index e26b0aa7561..b9c32e91e4c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java
@@ -37,10 +37,10 @@ import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.CachedDNSToSwitchMapping;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
-import org.powermock.reflect.Whitebox;
import org.testng.annotations.Test;
/**
@@ -281,7 +281,7 @@ public class BookKeeperClientFactoryImplTest {
}
@Test
- public void testBookKeeperIoThreadsConfiguration() {
+ public void testBookKeeperIoThreadsConfiguration() throws Exception {
BookKeeperClientFactoryImpl factory = new BookKeeperClientFactoryImpl();
ServiceConfiguration conf = new ServiceConfiguration();
assertEquals(factory.createBkClientConfiguration(mock(MetadataStoreExtended.class), conf)
@@ -292,11 +292,11 @@ public class BookKeeperClientFactoryImplTest {
EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
BookKeeper.Builder builder = factory.getBookKeeperBuilder(conf, eventLoopGroup,
mock(StatsLogger.class), mock(ClientConfiguration.class));
- assertEquals(Whitebox.getInternalState(builder, "eventLoopGroup"), eventLoopGroup);
+ assertEquals(FieldUtils.readField(builder, "eventLoopGroup", true), eventLoopGroup);
conf.setBookkeeperClientSeparatedIoThreadsEnabled(true);
builder = factory.getBookKeeperBuilder(conf, eventLoopGroup,
mock(StatsLogger.class), mock(ClientConfiguration.class));
- assertNull(Whitebox.getInternalState(builder, "eventLoopGroup"));
+ assertNull(FieldUtils.readField(builder, "eventLoopGroup", true));
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceCloseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceCloseTest.java
index c424132855b..1fbb40a6a56 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceCloseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceCloseTest.java
@@ -24,9 +24,9 @@ import static org.testng.AssertJUnit.assertTrue;
import java.util.concurrent.ScheduledFuture;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
-import org.awaitility.reflect.WhiteboxImpl;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -62,18 +62,21 @@ public class PulsarServiceCloseTest extends MockedPulsarServiceBaseTest {
@Test(timeOut = 30_000)
public void closeInTimeTest() throws Exception {
LoadSheddingTask task = pulsar.getLoadSheddingTask();
- boolean isCancel = WhiteboxImpl.getInternalState(task, "isCancel");
- assertFalse(isCancel);
- ScheduledFuture<?> loadSheddingFuture = WhiteboxImpl.getInternalState(task, "future");
- assertFalse(loadSheddingFuture.isCancelled());
+
+ {
+ assertFalse((boolean) FieldUtils.readField(task, "isCancel", true));
+ ScheduledFuture<?> loadSheddingFuture = (ScheduledFuture<?>) FieldUtils.readField(task, "future", true);
+ assertFalse(loadSheddingFuture.isCancelled());
+ }
// The pulsar service is not used, so it should be closed gracefully in short time.
pulsar.close();
- isCancel = WhiteboxImpl.getInternalState(task, "isCancel");
- assertTrue(isCancel);
- loadSheddingFuture = WhiteboxImpl.getInternalState(task, "future");
- assertTrue(loadSheddingFuture.isCancelled());
+ {
+ assertTrue((boolean) FieldUtils.readField(task, "isCancel", true));
+ ScheduledFuture<?> loadSheddingFuture = (ScheduledFuture<?>) FieldUtils.readField(task, "future", true);
+ assertTrue(loadSheddingFuture.isCancelled());
+ }
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java
index 2a97bc4f8f2..d9199586181 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java
@@ -22,7 +22,6 @@ import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
-import java.lang.reflect.Field;
import java.util.Collection;
import java.util.Date;
import java.util.Map;
@@ -32,13 +31,11 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.TimeoutHandler;
-import javax.ws.rs.core.UriInfo;
import org.apache.pulsar.broker.admin.v2.PersistentTopics;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
@@ -51,26 +48,16 @@ import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker-admin")
public class AdminApiGetLastMessageIdTest extends MockedPulsarServiceBaseTest {
- private PersistentTopics persistentTopics;
- private final String testTenant = "my-tenant";
- private final String testLocalCluster = "use";
- private final String testNamespace = "my-namespace";
- protected Field uriField;
- protected UriInfo uriInfo;
+ private static final String testTenant = "my-tenant";
+ private static final String testNamespace = "my-namespace";
- @BeforeClass
- public void initPersistentTopics() throws Exception {
- uriField = PulsarWebResource.class.getDeclaredField("uri");
- uriField.setAccessible(true);
- uriInfo = mock(UriInfo.class);
- }
+ private PersistentTopics persistentTopics;
@Override
@BeforeMethod
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index de555c1715d..3a9bd21245b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.admin;
-import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
@@ -94,7 +93,6 @@ import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.zookeeper.KeeperException;
import org.awaitility.Awaitility;
import org.mockito.ArgumentCaptor;
-import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
@@ -149,9 +147,8 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
PulsarResources resources =
spy(new PulsarResources(pulsar.getLocalMetadataStore(), pulsar.getConfigurationMetadataStore()));
- doReturn(spyWithClassAndConstructorArgs(TopicResources.class, pulsar.getLocalMetadataStore())).when(resources)
- .getTopicResources();
- Whitebox.setInternalState(pulsar, "pulsarResources", resources);
+ doReturn(spy(new TopicResources(pulsar.getLocalMetadataStore()))).when(resources).getTopicResources();
+ doReturn(resources).when(pulsar).getPulsarResources();
admin.clusters().createCluster("use", ClusterData.builder().serviceUrl("http://broker-use.com:8080").build());
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl("http://broker-use.com:8080").build());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
index 7bd15992f64..e8c0683569f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
@@ -38,9 +38,9 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
-import org.powermock.reflect.Whitebox;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -122,15 +122,14 @@ public class TopicAutoCreationTest extends ProducerConsumerBase {
});
- LookupService original = Whitebox.getInternalState(pulsarClient, "lookup");
+ LookupService original = ((PulsarClientImpl) pulsarClient).getLookup();
try {
// we want to skip the "lookup" phase, because it is blocked by the HTTP API
LookupService mockLookup = mock(LookupService.class);
- Whitebox.setInternalState(pulsarClient, "lookup", mockLookup);
- when(mockLookup.getPartitionedTopicMetadata(any())).thenAnswer(i -> {
- return CompletableFuture.completedFuture(new PartitionedTopicMetadata(0));
- });
+ ((PulsarClientImpl) pulsarClient).setLookup(mockLookup);
+ when(mockLookup.getPartitionedTopicMetadata(any())).thenAnswer(
+ i -> CompletableFuture.completedFuture(new PartitionedTopicMetadata(0)));
when(mockLookup.getBroker(any())).thenAnswer(i -> {
InetSocketAddress brokerAddress =
new InetSocketAddress(pulsar.getAdvertisedAddress(), pulsar.getBrokerListenPort().get());
@@ -139,20 +138,20 @@ public class TopicAutoCreationTest extends ProducerConsumerBase {
// Creating a producer and creating a Consumer may trigger automatic topic
// creation, let's try to create a Producer and a Consumer
- try (Producer<byte[]> producer = pulsarClient.newProducer()
+ try (Producer<byte[]> ignored = pulsarClient.newProducer()
.sendTimeout(1, TimeUnit.SECONDS)
.topic(topic)
- .create();) {
+ .create()) {
} catch (PulsarClientException.LookupException expected) {
String msg = "Namespace bundle for topic (%s) not served by this instance";
log.info("Expected error", expected);
assertTrue(expected.getMessage().contains(String.format(msg, topic)));
}
- try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ try (Consumer<byte[]> ignored = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("test")
- .subscribe();) {
+ .subscribe()) {
} catch (PulsarClientException.LookupException expected) {
String msg = "Namespace bundle for topic (%s) not served by this instance";
log.info("Expected error", expected);
@@ -170,17 +169,16 @@ public class TopicAutoCreationTest extends ProducerConsumerBase {
admin.topics().getList(namespaceName).isEmpty();
// create now the topic using auto creation
- Whitebox.setInternalState(pulsarClient, "lookup", original);
-
- try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ ((PulsarClientImpl) pulsarClient).setLookup(original);
+ try (Consumer<byte[]> ignored = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("test")
- .subscribe();) {
+ .subscribe()) {
}
admin.topics().getList(namespaceName).contains(topic);
} finally {
- Whitebox.setInternalState(pulsarClient, "lookup", original);
+ ((PulsarClientImpl) pulsarClient).setLookup(original);
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
index e7dfff6de15..49ead1c5fcb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
@@ -49,6 +49,7 @@ import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.util.Utf8;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
@@ -88,7 +89,6 @@ import org.apache.pulsar.websocket.data.ProducerMessages;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -320,7 +320,7 @@ public class TopicsTest extends MockedPulsarServiceBaseTest {
doReturn(false).when(topics).isRequestHttps();
UriInfo uriInfo = mock(UriInfo.class);
doReturn(requestPath).when(uriInfo).getRequestUri();
- Whitebox.setInternalState(topics, "uri", uriInfo);
+ FieldUtils.writeField(topics, "uri", uriInfo, true);
//do produce on another broker
topics.setPulsar(pulsar2);
AsyncResponse asyncResponse = mock(AsyncResponse.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
index 8be88350ea6..5d62ec2c58c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.loadbalance;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -26,7 +25,6 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
-import java.lang.reflect.Method;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
@@ -35,13 +33,17 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import lombok.SneakyThrows;
import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.commons.lang3.reflect.MethodUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.impl.PulsarResourceDescription;
@@ -68,7 +70,8 @@ import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.awaitility.Awaitility;
-import org.powermock.reflect.Whitebox;
+import org.mockito.MockedConstruction;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -92,12 +95,12 @@ public class LoadBalancerTest {
private static final int MAX_RETRIES = 15;
private static final int BROKER_COUNT = 5;
- private int[] brokerWebServicePorts = new int[BROKER_COUNT];
- private int[] brokerNativeBrokerPorts = new int[BROKER_COUNT];
- private URL[] brokerUrls = new URL[BROKER_COUNT];
- private String[] lookupAddresses = new String[BROKER_COUNT];
- private PulsarService[] pulsarServices = new PulsarService[BROKER_COUNT];
- private PulsarAdmin[] pulsarAdmins = new PulsarAdmin[BROKER_COUNT];
+ private final int[] brokerWebServicePorts = new int[BROKER_COUNT];
+ private final int[] brokerNativeBrokerPorts = new int[BROKER_COUNT];
+ private final URL[] brokerUrls = new URL[BROKER_COUNT];
+ private final String[] lookupAddresses = new String[BROKER_COUNT];
+ private final PulsarService[] pulsarServices = new PulsarService[BROKER_COUNT];
+ private final PulsarAdmin[] pulsarAdmins = new PulsarAdmin[BROKER_COUNT];
@BeforeMethod
void setup() throws Exception {
@@ -222,7 +225,7 @@ public class LoadBalancerTest {
}
/*
- * tests rankings get updated when we write write the new load reports to the zookeeper on loadbalance root node
+ * tests rankings get updated when we write the new load reports to the zookeeper on load-balance root node
* tests writing pre-configured load report on the zookeeper translates the pre-calculated rankings
*/
@Test
@@ -237,18 +240,15 @@ public class LoadBalancerTest {
sru.setCpu(new ResourceUsage(5, 400));
lr.setSystemResourceUsage(sru);
- Whitebox.setInternalState(pulsarServices[0].getLoadManager().get(), "lastLoadReport", lr);
- ResourceLock<LoadReport> lock = Whitebox.getInternalState(pulsarServices[i].getLoadManager().get(),
- "brokerLock");
- lock.updateValue(lr).join();
+ FieldUtils.writeField(pulsarServices[0].getLoadManager().get(), "lastLoadReport", lr, true);
+ updateLastReport(pulsarServices[i].getLoadManager().get(), lr);
}
for (int i = 0; i < BROKER_COUNT; i++) {
- Method updateRanking = Whitebox.getMethod(SimpleLoadManagerImpl.class, "updateRanking");
- updateRanking.invoke(pulsarServices[0].getLoadManager().get());
+ MethodUtils.invokeMethod(pulsarServices[0].getLoadManager().get(), true, "updateRanking");
}
- // do lookup for bunch of bundles
+ // do lookup for a bunch of bundles
int totalNamespaces = 200;
Map<String, Integer> namespaceOwner = new HashMap<>();
for (int i = 0; i < totalNamespaces; i++) {
@@ -275,6 +275,13 @@ public class LoadBalancerTest {
}
}
+ @SuppressWarnings("unchecked")
+ @SneakyThrows
+ private void updateLastReport(LoadManager lm, LoadReport lr){
+ ResourceLock<LoadReport> lock = (ResourceLock<LoadReport>) FieldUtils.readField(lm, "brokerLock", true);
+ lock.updateValue(lr).join();
+ }
+
private AtomicReference<Map<Long, Set<ResourceUnit>>> getSortedRanking(PulsarService pulsar)
throws NoSuchFieldException, IllegalAccessException {
Field ranking = ((SimpleLoadManagerImpl) pulsar.getLoadManager().get()).getClass()
@@ -312,17 +319,14 @@ public class LoadBalancerTest {
sru.setCpu(new ResourceUsage(60, 400));
lr.setSystemResourceUsage(sru);
- ResourceLock<LoadReport> lock = Whitebox.getInternalState(pulsarServices[i].getLoadManager().get(),
- "brokerLock");
- lock.updateValue(lr).join();
+ updateLastReport(pulsarServices[i].getLoadManager().get(), lr);
}
for (int i = 0; i < BROKER_COUNT; i++) {
- Method method = Whitebox.getMethod(SimpleLoadManagerImpl.class, "getUpdateRankingHandle");
LoadManager loadManager = pulsarServices[i].getLoadManager().get();
Awaitility.await().until(() -> {
- Object invoke = method.invoke(loadManager);
- return invoke != null && ((Future) invoke).isDone();
+ Future<?> f = ((SimpleLoadManagerImpl) loadManager).getUpdateRankingHandle();
+ return f != null && f.isDone();
});
}
@@ -376,15 +380,12 @@ public class LoadBalancerTest {
}
lr.setBundleStats(bundleStats);
- Whitebox.setInternalState(pulsarServices[0].getLoadManager().get(), "lastLoadReport", lr);
- ResourceLock<LoadReport> lock = Whitebox.getInternalState(pulsarServices[i].getLoadManager().get(),
- "brokerLock");
- lock.updateValue(lr).join();
+ FieldUtils.writeField(pulsarServices[0].getLoadManager().get(), "lastLoadReport", lr, true);
+ updateLastReport(pulsarServices[i].getLoadManager().get(), lr);
}
for (int i = 0; i < BROKER_COUNT; i++) {
- Method updateRanking = Whitebox.getMethod(SimpleLoadManagerImpl.class, "updateRanking");
- updateRanking.invoke(pulsarServices[0].getLoadManager().get());
+ MethodUtils.invokeMethod(pulsarServices[0].getLoadManager().get(), true, "updateRanking");
}
// print ranking
@@ -469,10 +470,7 @@ public class LoadBalancerTest {
bundleStats.put(bundleName, stats);
}
lr.setBundleStats(bundleStats);
-
- ResourceLock<LoadReport> lock = Whitebox.getInternalState(pulsarServices[i].getLoadManager().get(),
- "brokerLock");
- lock.updateValue(lr).join();
+ updateLastReport(pulsarServices[i].getLoadManager().get(), lr);
}
}
@@ -597,8 +595,12 @@ public class LoadBalancerTest {
}
// fake Namespaces Admin
- NamespacesImpl namespaceAdmin = mock(NamespacesImpl.class);
- Whitebox.setInternalState(pulsarServices[0].getAdminClient(), "namespaces", namespaceAdmin);
+ CompletableFuture<NamespacesImpl> namespaceAdminFuture = new CompletableFuture<>();
+ try (MockedConstruction<NamespacesImpl> ignore = Mockito.mockConstruction(
+ NamespacesImpl.class, (allocator, context) -> namespaceAdminFuture.complete(allocator))) {
+ pulsarServices[0].getAdminClient();
+ }
+ NamespacesImpl namespaceAdmin = namespaceAdminFuture.get();
// create load report
// namespace 01~09 need to be split
@@ -631,10 +633,8 @@ public class LoadBalancerTest {
newBundleStats(maxTopics + 1, 0, 0, 0, 0, 0, 0));
lr.setBundleStats(bundleStats);
- Whitebox.setInternalState(pulsarServices[0].getLoadManager().get(), "lastLoadReport", lr);
- ResourceLock<LoadReport> lock = Whitebox.getInternalState(pulsarServices[0].getLoadManager().get(),
- "brokerLock");
- lock.updateValue(lr).join();
+ FieldUtils.writeField(pulsarServices[0].getLoadManager().get(), "lastLoadReport", lr, true);
+ updateLastReport(pulsarServices[0].getLoadManager().get(), lr);
// sleep to wait load ranking be triggered and trigger bundle split
Thread.sleep(5000);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index a889bedb7c1..9b7ea315f59 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -85,7 +85,6 @@ import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
-import org.powermock.reflect.Whitebox;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -450,20 +449,16 @@ public class ModularLoadManagerImplTest {
/**
* It verifies that deletion of broker-znode on broker-stop will invalidate availableBrokerCache list
- *
- * @throws Exception
*/
@Test
public void testBrokerStopCacheUpdate() throws Exception {
ModularLoadManagerWrapper loadManagerWrapper = (ModularLoadManagerWrapper) pulsar1.getLoadManager().get();
- ModularLoadManagerImpl lm = Whitebox.getInternalState(loadManagerWrapper, "loadManager");
+ ModularLoadManagerImpl lm = (ModularLoadManagerImpl) loadManagerWrapper.getLoadManager();
assertEquals(lm.getAvailableBrokers().size(), 2);
pulsar2.close();
- Awaitility.await().untilAsserted(() -> {
- assertEquals(lm.getAvailableBrokers().size(), 1);
- });
+ Awaitility.await().untilAsserted(() -> assertEquals(lm.getAvailableBrokers().size(), 1));
}
/**
@@ -481,8 +476,6 @@ public class ModularLoadManagerImplTest {
* b. available-brokers: broker2, broker3 => result: broker2
* c. available-brokers: broker3 => result: NULL
* </pre>
- *
- * @throws Exception
*/
@Test
public void testNamespaceIsolationPoliciesForPrimaryAndSecondaryBrokers() throws Exception {
@@ -664,27 +657,20 @@ public class ModularLoadManagerImplTest {
@Test
public void testRemoveDeadBrokerTimeAverageData() throws Exception {
ModularLoadManagerWrapper loadManagerWrapper = (ModularLoadManagerWrapper) pulsar1.getLoadManager().get();
- ModularLoadManagerImpl lm = Whitebox.getInternalState(loadManagerWrapper, "loadManager");
+ ModularLoadManagerImpl lm = (ModularLoadManagerImpl) loadManagerWrapper.getLoadManager();
assertEquals(lm.getAvailableBrokers().size(), 2);
pulsar2.close();
- Awaitility.await().untilAsserted(() -> {
- assertEquals(lm.getAvailableBrokers().size(), 1);
- });
+ Awaitility.await().untilAsserted(() -> assertEquals(lm.getAvailableBrokers().size(), 1));
lm.updateAll();
List<String> data = pulsar1.getLocalMetadataStore()
- .getMetadataCache(TimeAverageBrokerData.class).getChildren(TIME_AVERAGE_BROKER_ZPATH).join();
-
- Awaitility.await().untilAsserted(() -> {
- assertTrue(pulsar1.getLeaderElectionService().isLeader());
- });
+ .getMetadataCache(TimeAverageBrokerData.class)
+ .getChildren(TIME_AVERAGE_BROKER_ZPATH)
+ .join();
+ Awaitility.await().untilAsserted(() -> assertTrue(pulsar1.getLeaderElectionService().isLeader()));
assertEquals(data.size(), 1);
-
-
-
-
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 6f5462c1cea..cb806d3ccfe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.LoadManager;
@@ -87,7 +88,6 @@ import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.BundleData;
import org.awaitility.Awaitility;
import org.mockito.stubbing.Answer;
-import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -822,11 +822,10 @@ public class NamespaceServiceTest extends BrokerTestBase {
// Wait until "ModularLoadManager" completes processing the ZK notification.
ModularLoadManagerWrapper modularLoadManagerWrapper = (ModularLoadManagerWrapper) loadManager;
ModularLoadManagerImpl modularLoadManager = (ModularLoadManagerImpl) modularLoadManagerWrapper.getLoadManager();
- ScheduledExecutorService scheduler = Whitebox.getInternalState(modularLoadManager, "scheduler");
+ ScheduledExecutorService scheduler = (ScheduledExecutorService) FieldUtils.readField(
+ modularLoadManager, "scheduler", true);
CompletableFuture<Void> waitForNoticeHandleFinishByLoadManager = new CompletableFuture<>();
- scheduler.execute(() -> {
- waitForNoticeHandleFinishByLoadManager.complete(null);
- });
+ scheduler.execute(() -> waitForNoticeHandleFinishByLoadManager.complete(null));
waitForNoticeHandleFinishByLoadManager.join();
// Manually trigger "LoadResourceQuotaUpdaterTask"
loadManager.writeResourceQuotasToZooKeeper();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
index 604abd8d709..124372ea15a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
@@ -39,7 +39,6 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
-import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -294,17 +293,13 @@ public class ExclusiveProducerTest extends BrokerTestBase {
// Simulate a producer that takes over and fences p1 through the topic epoch
if (!partitioned) {
Topic t = pulsar.getBrokerService().getTopic(topic, false).get().get();
- CompletableFuture<?> f = (CompletableFuture<?>) Whitebox
- .getMethod(AbstractTopic.class, "incrementTopicEpoch", Optional.class)
- .invoke(t, Optional.of(0L));
+ CompletableFuture<?> f = ((AbstractTopic) t).incrementTopicEpoch(Optional.of(0L));
f.get();
} else {
for (int i = 0; i < 3; i++) {
String name = TopicName.get(topic).getPartition(i).toString();
Topic t = pulsar.getBrokerService().getTopic(name, false).get().get();
- CompletableFuture<?> f = (CompletableFuture<?>) Whitebox
- .getMethod(AbstractTopic.class, "incrementTopicEpoch", Optional.class)
- .invoke(t, Optional.of(0L));
+ CompletableFuture<?> f = ((AbstractTopic) t).incrementTopicEpoch(Optional.of(0L));
f.get();
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
index d8dadaf8b5b..d8485f3f051 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
@@ -19,11 +19,8 @@
package org.apache.pulsar.broker.service;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
-
-import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Sets;
import java.util.LinkedHashMap;
import java.util.List;
@@ -31,15 +28,14 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.Cleanup;
-import org.apache.commons.lang3.mutable.MutableBoolean;
+import lombok.SneakyThrows;
import org.apache.commons.lang3.mutable.MutableObject;
-import org.apache.jute.Record;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.namespace.OwnedBundle;
import org.apache.pulsar.broker.namespace.OwnershipCache;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
@@ -48,16 +44,8 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
-import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.proto.ReplyHeader;
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.server.ZooKeeperServer;
import org.mockito.stubbing.Answer;
-import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -136,6 +124,7 @@ public class TopicOwnerTest {
}
@SuppressWarnings("unchecked")
+ @SneakyThrows(IllegalAccessException.class)
private MutableObject<PulsarService> spyLeaderNamespaceServiceForAuthorizedBroker() {
// Spy leader namespace service to inject authorized broker for namespace-bundle from leader,
// this is a safe operation since it is just an recommendation if namespace-bundle has no owner
@@ -157,85 +146,10 @@ public class TopicOwnerTest {
return CompletableFuture.completedFuture(Optional.of(lookupResult));
};
doAnswer(answer).when(spyLeaderNamespaceService).getBrokerServiceUrlAsync(any(TopicName.class), any(LookupOptions.class));
- Whitebox.setInternalState(leaderPulsar, "nsService", spyLeaderNamespaceService);
+ FieldUtils.writeField(leaderPulsar, "nsService", spyLeaderNamespaceService, true);
return leaderAuthorizedBroker;
}
- private CompletableFuture<Void> watchMetadataStoreReconnect(MetadataStoreExtended store) {
- CompletableFuture<Void> reconnectedFuture = new CompletableFuture<>();
- store.registerSessionListener(event -> {
- if (event == SessionEvent.Reconnected || event == SessionEvent.SessionReestablished) {
- reconnectedFuture.complete(null);
- }
- });
-
- return reconnectedFuture;
- }
-
- @FunctionalInterface
- interface RequestMatcher {
- boolean match(Request request) throws Exception;
- }
-
- private void spyZookeeperToDisconnectBeforePersist(ZooKeeper zooKeeper, RequestMatcher matcher) {
- ZooKeeperServer zooKeeperServer = bkEnsemble.getZkServer();
- ServerCnxn zkServerConnection = bkEnsemble.getZookeeperServerConnection(zooKeeper);
- ZooKeeperServer spyZooKeeperServer = spy(zooKeeperServer);
-
- // Spy zk server connection to close connection to cause connection loss after namespace-bundle
- // deleted successfully. This mimics crash of connected zk server after committing requested operation.
- Whitebox.setInternalState(zkServerConnection, "zkServer", spyZooKeeperServer);
- doAnswer(invocation -> {
- Request request = invocation.getArgument(0);
- if (request.sessionId != zooKeeper.getSessionId()) {
- return invocation.callRealMethod();
- }
- if (!matcher.match(request)) {
- return invocation.callRealMethod();
- }
- Whitebox.setInternalState(zkServerConnection, "zkServer", zooKeeperServer);
- bkEnsemble.disconnectZookeeper(zooKeeper);
- return null;
- }).when(spyZooKeeperServer).submitRequest(any(Request.class));
- }
-
- private void spyZookeeperToDisconnectAfterPersist(ZooKeeper zooKeeper, RequestMatcher matcher) {
- ZooKeeperServer zooKeeperServer = bkEnsemble.getZkServer();
- ServerCnxn zkServerConnection = bkEnsemble.getZookeeperServerConnection(zooKeeper);
- ZooKeeperServer spyZooKeeperServer = spy(zooKeeperServer);
-
- // Spy zk server connection to close connection to cause connection loss after namespace-bundle
- // deleted successfully. This mimics crash of connected zk server after committing requested operation.
- Whitebox.setInternalState(zkServerConnection, "zkServer", spyZooKeeperServer);
- MutableBoolean disconnected = new MutableBoolean();
- doAnswer(invocation -> {
- Request request = invocation.getArgument(0);
- if (request.sessionId != zooKeeper.getSessionId()) {
- return invocation.callRealMethod();
- }
- if (!matcher.match(request)) {
- return invocation.callRealMethod();
- }
-
- ServerCnxn spyZkServerConnection1 = spy(zkServerConnection);
- doAnswer(responseInvocation -> {
- synchronized (disconnected) {
- ReplyHeader replyHeader = responseInvocation.getArgument(0);
- if (replyHeader.getXid() == request.cxid && replyHeader.getErr() == 0) {
- Whitebox.setInternalState(zkServerConnection, "zkServer", zooKeeperServer);
- disconnected.setTrue();
- bkEnsemble.disconnectZookeeper(zooKeeper);
- } else if (disconnected.isFalse()) {
- return responseInvocation.callRealMethod();
- }
- return null;
- }
- }).when(spyZkServerConnection1).sendResponse(any(ReplyHeader.class), nullable(Record.class), any(String.class));
- Whitebox.setInternalState(request, "cnxn", spyZkServerConnection1);
- return invocation.callRealMethod();
- }).when(spyZooKeeperServer).submitRequest(any(Request.class));
- }
-
@Test
public void testReestablishOwnershipAfterInvalidateCache() throws Exception {
String topic1 = "persistent://my-tenant/my-ns/topic-1";
@@ -254,12 +168,11 @@ public class TopicOwnerTest {
Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());
OwnershipCache ownershipCache1 = pulsar1.getNamespaceService().getOwnershipCache();
- AsyncLoadingCache<NamespaceBundle, OwnedBundle> ownedBundlesCache1 = Whitebox.getInternalState(ownershipCache1, "ownedBundlesCache");
leaderAuthorizedBroker.setValue(null);
Assert.assertNotNull(ownershipCache1.getOwnedBundle(namespaceBundle));
- ownedBundlesCache1.synchronous().invalidate(namespaceBundle);
+ ownershipCache1.invalidateLocalOwnerCache(namespaceBundle);
Assert.assertNull(ownershipCache1.getOwnedBundle(namespaceBundle));
// pulsar1 is still owner in zk.
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
index 72435aa265e..6ad17b7d27e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
@@ -27,6 +27,7 @@ import lombok.Cleanup;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
@@ -41,7 +42,6 @@ import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.PulsarTestClient;
import org.apache.pulsar.common.stats.Metrics;
import org.awaitility.Awaitility;
-import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -171,7 +171,8 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
producer.send(message.getBytes());
consumer.acknowledge(consumer.receive().getMessageId());
// Make BK error.
- LedgerHandle ledgerHandle = Whitebox.getInternalState(managedCursor, "cursorLedger");
+ LedgerHandle ledgerHandle = (LedgerHandle) FieldUtils.readField(
+ managedCursor, "cursorLedger", true);
ledgerHandle.close();
return managedCursorMXBean.getPersistLedgerErrors() > 0
&& managedCursorMXBean.getPersistZookeeperSucceed() > 0;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index 4e03f839213..b13c8ff2cb2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -19,12 +19,19 @@
package org.apache.pulsar.broker.systopic;
import com.google.common.collect.Sets;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang3.reflect.MethodUtils;
import org.apache.pulsar.broker.admin.impl.BrokersBase;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerTestBase;
@@ -33,6 +40,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.ListTopicsOptions;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
@@ -40,26 +48,18 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
-import org.apache.pulsar.common.policies.data.BacklogQuota;
-import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.TopicVersion;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
-import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
@Test(groups = "broker")
public class PartitionedSystemTopicTest extends BrokerTestBase {
@@ -207,7 +207,10 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
config.setMinimumRolloverTime(1, TimeUnit.SECONDS);
config.setMaximumRolloverTime(1, TimeUnit.SECONDS);
persistentTopic.getManagedLedger().setConfig(config);
- Whitebox.invokeMethod(persistentTopic.getManagedLedger(), "updateLastLedgerCreatedTimeAndScheduleRolloverTask");
+ MethodUtils.invokeMethod(
+ persistentTopic.getManagedLedger(),
+ true,
+ "updateLastLedgerCreatedTimeAndScheduleRolloverTask");
String msg1 = "msg-1";
producer.send(msg1);
Thread.sleep(3 * 1000);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index d0674721c00..ffc351e9413 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -60,6 +60,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
+import lombok.Lombok;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.Bytes;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -71,6 +72,8 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.commons.lang3.reflect.MethodUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
@@ -138,7 +141,6 @@ import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig
import org.awaitility.Awaitility;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -1244,8 +1246,8 @@ public class TransactionTest extends TransactionTestBase {
for (int i = 0; i < 10; i++) {
producer.newMessage().value(Bytes.toBytes(i)).send();
}
- ClientCnx cnx = Whitebox.invokeMethod(consumer, "cnx");
- Whitebox.invokeMethod(consumer, "connectionClosed", cnx);
+ ClientCnx cnx = (ClientCnx) MethodUtils.invokeMethod(consumer, true, "cnx");
+ MethodUtils.invokeMethod(consumer, true, "connectionClosed", cnx);
Message<byte[]> message = consumer.receive();
Transaction transaction = pulsarClient
@@ -1391,39 +1393,37 @@ public class TransactionTest extends TransactionTestBase {
*/
@Test
public void testTBRecoverChangeStateError() throws InterruptedException, TimeoutException {
- final AtomicReference<PersistentTopic> persistentTopic = new AtomicReference<PersistentTopic>();
- AtomicInteger atomicInteger = new AtomicInteger(1);
+ final AtomicReference<PersistentTopic> persistentTopic = new AtomicReference<>();
// Create Executor
- ScheduledExecutorService executorService_recover = mock(ScheduledExecutorService.class);
+ ScheduledExecutorService executorServiceRecover = mock(ScheduledExecutorService.class);
// Mock serviceConfiguration.
ServiceConfiguration serviceConfiguration = mock(ServiceConfiguration.class);
when(serviceConfiguration.isEnableReplicatedSubscriptions()).thenReturn(false);
when(serviceConfiguration.isTransactionCoordinatorEnabled()).thenReturn(true);
// Mock executorProvider.
ExecutorProvider executorProvider = mock(ExecutorProvider.class);
- when(executorProvider.getExecutor(any(Object.class))).thenReturn(executorService_recover);
+ when(executorProvider.getExecutor(any(Object.class))).thenReturn(executorServiceRecover);
// Mock pendingAckStore.
PendingAckStore pendingAckStore = mock(PendingAckStore.class);
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- new Thread(() -> {
- TopicTransactionBuffer.TopicTransactionBufferRecover recover
- = (TopicTransactionBuffer.TopicTransactionBufferRecover)invocation.getArguments()[0];
- TopicTransactionBufferRecoverCallBack callBack
- = Whitebox.getInternalState(recover, "callBack");;
- try {
- persistentTopic.get().getTransactionBuffer().closeAsync().get();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- throw new RuntimeException(e);
+ doAnswer(invocation -> {
+ new Thread(() -> {
+ TopicTransactionBuffer.TopicTransactionBufferRecover recover
+ = (TopicTransactionBuffer.TopicTransactionBufferRecover) invocation.getArguments()[0];
+ TopicTransactionBufferRecoverCallBack callBack = null;
+ try {
+ callBack = (TopicTransactionBufferRecoverCallBack) FieldUtils.readField(
+ recover, "callBack", true);
+ persistentTopic.get().getTransactionBuffer().closeAsync().get();
+ } catch (Exception e) {
+ throw Lombok.sneakyThrow(e);
+ } finally {
+ if (callBack != null) {
+ callBack.recoverComplete();
}
- callBack.recoverComplete();
- }).start();
- return null;
- }
- }).when(executorService_recover).execute(any());
+ }
+ }).start();
+ return null;
+ }).when(executorServiceRecover).execute(any());
// Mock executorProvider.
TransactionPendingAckStoreProvider pendingAckStoreProvider = mock(TransactionPendingAckStoreProvider.class);
when(pendingAckStoreProvider.checkInitializedBefore(any()))
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index 576ef647248..ba7b8b6b40f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.transaction.buffer;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.client.api.Producer;
@@ -29,7 +30,6 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.awaitility.Awaitility;
-import org.powermock.reflect.Whitebox;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -83,7 +83,7 @@ public class TopicTransactionBufferTest extends TransactionTestBase {
producer.newMessage(txn).value("test".getBytes()).send();
PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0)
.getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get();
- Whitebox.setInternalState(persistentTopic.getManagedLedger(), "state", ManagedLedgerImpl.State.WriteFailed);
+ FieldUtils.writeField(persistentTopic.getManagedLedger(), "state", ManagedLedgerImpl.State.WriteFailed, true);
txn.commit().get();
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
index 4cc29f396cf..0922c9b801b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
@@ -29,7 +29,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -61,7 +60,6 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.awaitility.Awaitility;
-import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -324,9 +322,11 @@ public class TransactionLowWaterMarkTest extends TransactionTestBase {
field.setAccessible(true);
field.set(txn1, TransactionImpl.State.OPEN);
- AtomicLong pendingWriteOps = Whitebox.getInternalState(getPulsarServiceList().get(0)
- .getBrokerService().getTopic(TopicName.get(TOPIC).toString(),
- false).get().get(), "pendingWriteOps");
+ PersistentTopic t = (PersistentTopic) getPulsarServiceList().get(0)
+ .getBrokerService()
+ .getTopic(TopicName.get(TOPIC).toString(), false)
+ .get()
+ .orElseThrow();
try {
producer.newMessage(txn1).send();
fail();
@@ -334,7 +334,7 @@ public class TransactionLowWaterMarkTest extends TransactionTestBase {
// no-op
}
- assertEquals(pendingWriteOps.get(), 0);
+ assertEquals(t.getPendingWriteOps().get(), 0);
}
@Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
index c35d15d96da..58be651a7d1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
@@ -19,6 +19,15 @@
package org.apache.pulsar.broker.transaction.pendingack;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
@@ -40,23 +49,11 @@ import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
-import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.lang.reflect.Field;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.TimeUnit;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
@Slf4j
@Test(groups = "broker")
public class PendingAckInMemoryDeleteTest extends TransactionTestBase {
@@ -309,12 +306,15 @@ public class PendingAckInMemoryDeleteTest extends TransactionTestBase {
consumer.acknowledgeAsync(consumer.receive().getMessageId(), commitTxn).get();
- PendingAckHandle pendingAckHandle = Whitebox.getInternalState(getPulsarServiceList().get(0)
- .getBrokerService().getTopic("persistent://" + normalTopic, false).get().get()
- .getSubscription(subscriptionName), "pendingAckHandle");
-
+ Topic t = getPulsarServiceList().get(0)
+ .getBrokerService()
+ .getTopic("persistent://" + normalTopic, false)
+ .get()
+ .orElseThrow();
+ PersistentSubscription subscription = (PersistentSubscription) t.getSubscription(subscriptionName);
+ PendingAckHandleImpl pendingAckHandle = (PendingAckHandleImpl) subscription.getPendingAckHandle();
Map<PositionImpl, MutablePair<PositionImpl, Integer>> individualAckPositions =
- Whitebox.getInternalState(pendingAckHandle, "individualAckPositions");
+ pendingAckHandle.getIndividualAckPositions();
// one message in pending ack state
assertEquals(1, individualAckPositions.size());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 2f572a841b0..669161f67c0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -23,6 +23,7 @@ import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
@@ -75,10 +76,14 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import org.apache.avro.Schema.Parser;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -104,7 +109,6 @@ import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
-import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -1032,6 +1036,20 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
}
+ @Override
+ protected void beforePulsarStartMocks(PulsarService pulsar) throws Exception {
+ super.beforePulsarStartMocks(pulsar);
+ doAnswer(i0 -> {
+ ManagedLedgerFactory factory = (ManagedLedgerFactory) spy(i0.callRealMethod());
+ doAnswer(i1 -> {
+ EntryCacheManager manager = (EntryCacheManager) spy(i1.callRealMethod());
+ doAnswer(i2 -> spy(i2.callRealMethod())).when(manager).getEntryCache(any());
+ return manager;
+ }).when(factory).getEntryCacheManager();
+ return factory;
+ }).when(pulsar).getManagedLedgerFactory();
+ }
+
/**
* Usecase 1: Only 1 Active Subscription - 1 subscriber - Produce Messages - EntryCache should cache messages -
* EntryCache should be cleaned : Once active subscription consumes messages
@@ -1068,8 +1086,8 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger();
- EntryCache entryCache = spy((EntryCache) Whitebox.getInternalState(ledger, "entryCache"));
- Whitebox.setInternalState(ledger, "entryCache", entryCache);
+
+ EntryCache entryCache = (EntryCache) FieldUtils.readField(ledger, "entryCache", true);
Message<byte[]> msg;
// 2. Produce messages
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index c241b6e6cc7..4f9c9080646 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.api.v1;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
@@ -47,8 +48,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -71,7 +76,6 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
-import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -608,6 +612,20 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
}
}
+ @Override
+ protected void beforePulsarStartMocks(PulsarService pulsar) throws Exception {
+ super.beforePulsarStartMocks(pulsar);
+ doAnswer(i0 -> {
+ ManagedLedgerFactory factory = (ManagedLedgerFactory) spy(i0.callRealMethod());
+ doAnswer(i1 -> {
+ EntryCacheManager manager = (EntryCacheManager) spy(i1.callRealMethod());
+ doAnswer(i2 -> spy(i2.callRealMethod())).when(manager).getEntryCache(any());
+ return manager;
+ }).when(factory).getEntryCacheManager();
+ return factory;
+ }).when(pulsar).getManagedLedgerFactory();
+ }
+
/**
* Usecase 1: Only 1 Active Subscription - 1 subscriber - Produce Messages - EntryCache should cache messages -
* EntryCache should be cleaned : Once active subscription consumes messages
@@ -648,10 +666,9 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger();
- EntryCache entryCache = spy((EntryCache) Whitebox.getInternalState(ledger, "entryCache"));
- Whitebox.setInternalState(ledger, "entryCache", entryCache);
+ EntryCache entryCache = (EntryCache) FieldUtils.readField(ledger, "entryCache", true);
- Message<byte[]>msg = null;
+ Message<byte[]> msg = null;
// 2. Produce messages
for (int i = 0; i < 30; i++) {
String message = "my-message-" + i;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index 25a2539582e..c5d03038039 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -20,16 +20,11 @@ package org.apache.pulsar.client.impl;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
-
-import java.lang.reflect.Field;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
-
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -38,7 +33,6 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.powermock.reflect.Whitebox;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
@@ -267,7 +261,7 @@ public class NegativeAcksTest extends ProducerConsumerBase {
public void testNegativeAcksDeleteFromUnackedTracker() throws Exception {
String topic = BrokerTestUtil.newUniqueName("testNegativeAcksDeleteFromUnackedTracker");
@Cleanup
- Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ ConsumerImpl<String> consumer = (ConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("sub1")
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
@@ -282,18 +276,15 @@ public class NegativeAcksTest extends ProducerConsumerBase {
BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(3, 1, 0, 1);
BatchMessageIdImpl batchMessageId3 = new BatchMessageIdImpl(3, 1, 0, 2);
- UnAckedMessageTracker unAckedMessageTracker = ((ConsumerImpl) consumer).getUnAckedMessageTracker();
+ UnAckedMessageTracker unAckedMessageTracker = consumer.getUnAckedMessageTracker();
unAckedMessageTracker.add(topicMessageId);
- Field fieldNegativeAcksTracker = Whitebox.getField(ConsumerImpl.class, "negativeAcksTracker");
- NegativeAcksTracker negativeAcksTracker = (NegativeAcksTracker) fieldNegativeAcksTracker.get(((ConsumerImpl) consumer));
- Field fieldNackedMessages = Whitebox.getField(NegativeAcksTracker.class, "nackedMessages");
// negative topic message id
consumer.negativeAcknowledge(topicMessageId);
- HashMap<MessageId, Long> nackedMessages = (HashMap)fieldNackedMessages.get(negativeAcksTracker);
- assertEquals(nackedMessages.size(), 1);
+ NegativeAcksTracker negativeAcksTracker = consumer.getNegativeAcksTracker();
+ assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse(-1).intValue(), 1);
assertEquals(unAckedMessageTracker.size(), 0);
- nackedMessages.clear();
+ negativeAcksTracker.close();
// negative batch message id
unAckedMessageTracker.add(batchMessageId);
unAckedMessageTracker.add(batchMessageId2);
@@ -301,9 +292,9 @@ public class NegativeAcksTest extends ProducerConsumerBase {
consumer.negativeAcknowledge(batchMessageId);
consumer.negativeAcknowledge(batchMessageId2);
consumer.negativeAcknowledge(batchMessageId3);
- assertEquals(nackedMessages.size(), 1);
+ assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse(-1).intValue(), 1);
assertEquals(unAckedMessageTracker.size(), 0);
- nackedMessages.clear();
+ negativeAcksTracker.close();
}
@Test(timeOut = 10000)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 5e84a30e867..38251a7ad86 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.apache.pulsar.common.protocol.Commands.hasChecksum;
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Iterables;
import com.scurrilous.circe.checksum.Crc32cIntChecksum;
@@ -391,6 +392,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
return unAckedMessageTracker;
}
+ @VisibleForTesting
+ NegativeAcksTracker getNegativeAcksTracker() {
+ return negativeAcksTracker;
+ }
+
@Override
public CompletableFuture<Void> unsubscribeAsync() {
if (getState() == State.Closing || getState() == State.Closed) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index 6273f4d582e..86121dd2c34 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -19,11 +19,13 @@
package org.apache.pulsar.client.impl;
import static org.apache.pulsar.client.impl.UnAckedMessageTracker.addChunkedMessageIdsAndRemoveFromSequenceMap;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import java.io.Closeable;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Message;
@@ -123,6 +125,11 @@ class NegativeAcksTracker implements Closeable {
}
}
+ @VisibleForTesting
+ Optional<Integer> getNackedMessagesCount() {
+ return Optional.ofNullable(nackedMessages).map(HashMap::size);
+ }
+
@Override
public synchronized void close() {
if (timeout != null && !timeout.isCancelled()) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index b0e78273dc6..6f0b75fb540 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -994,7 +994,7 @@ public class PulsarClientImpl implements PulsarClient {
}
@VisibleForTesting
- void setLookup(LookupService lookup) {
+ public void setLookup(LookupService lookup) {
this.lookup = lookup;
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
index cc2a3519565..7f3d5bfd5d5 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
@@ -396,8 +396,6 @@ public class AvroSchemaTest {
// Because data does not conform to schema expect a crash
Assert.assertThrows( SchemaSerializationException.class, () -> avroWriter.write(badNasaMissionData));
- // Get the buffered data using powermock
-
// Assert that the buffer position is reset to zero
Assert.assertEquals(avroWriter.getEncoder().bytesBuffered(), 0);
}