You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/09/17 09:27:45 UTC

[pulsar] branch master updated: [improve][test] remove powermock-reflect dependency (#17696)

This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 492c7df1579 [improve][test] remove powermock-reflect dependency (#17696)
492c7df1579 is described below

commit 492c7df1579c12701da6a77af1986d3aa7ace840
Author: tison <wa...@gmail.com>
AuthorDate: Sat Sep 17 17:27:34 2022 +0800

    [improve][test] remove powermock-reflect dependency (#17696)
---
 pom.xml                                            | 13 ---
 .../pulsar/broker/namespace/OwnershipCache.java    |  6 +-
 .../service/persistent/PersistentSubscription.java |  5 ++
 .../broker/service/persistent/PersistentTopic.java |  5 ++
 .../pendingack/impl/PendingAckHandleImpl.java      |  6 ++
 .../broker/BookKeeperClientFactoryImplTest.java    |  8 +-
 .../pulsar/broker/PulsarServiceCloseTest.java      | 21 +++--
 .../broker/admin/AdminApiGetLastMessageIdTest.java | 19 +----
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  7 +-
 .../pulsar/broker/admin/TopicAutoCreationTest.java | 28 +++----
 .../org/apache/pulsar/broker/admin/TopicsTest.java |  4 +-
 .../broker/loadbalance/LoadBalancerTest.java       | 78 ++++++++---------
 .../loadbalance/ModularLoadManagerImplTest.java    | 30 ++-----
 .../broker/namespace/NamespaceServiceTest.java     |  9 +-
 .../broker/service/ExclusiveProducerTest.java      |  9 +-
 .../pulsar/broker/service/TopicOwnerTest.java      | 97 ++--------------------
 .../broker/stats/ManagedCursorMetricsTest.java     |  5 +-
 .../systopic/PartitionedSystemTopicTest.java       | 25 +++---
 .../pulsar/broker/transaction/TransactionTest.java | 52 ++++++------
 .../buffer/TopicTransactionBufferTest.java         |  4 +-
 .../buffer/TransactionLowWaterMarkTest.java        | 12 +--
 .../pendingack/PendingAckInMemoryDeleteTest.java   | 34 ++++----
 .../client/api/SimpleProducerConsumerTest.java     | 24 +++++-
 .../client/api/v1/V1_ProducerConsumerTest.java     | 25 +++++-
 .../pulsar/client/impl/NegativeAcksTest.java       | 23 ++---
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  6 ++
 .../pulsar/client/impl/NegativeAcksTracker.java    |  7 ++
 .../pulsar/client/impl/PulsarClientImpl.java       |  2 +-
 .../pulsar/client/impl/schema/AvroSchemaTest.java  |  2 -
 29 files changed, 246 insertions(+), 320 deletions(-)

diff --git a/pom.xml b/pom.xml
index 5618e95b660..5f147915ff1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -241,7 +241,6 @@ flexible messaging model and an intuitive client API.</description>
     <kerby.version>1.1.1</kerby.version>
     <testng.version>7.3.0</testng.version>
     <mockito.version>3.12.4</mockito.version>
-    <powermock.version>2.0.9</powermock.version>
     <javassist.version>3.25.0-GA</javassist.version>
     <skyscreamer.version>1.5.0</skyscreamer.version>
     <objenesis.version>3.1</objenesis.version>
@@ -340,12 +339,6 @@ flexible messaging model and an intuitive client API.</description>
         <version>${mockito.version}</version>
       </dependency>
 
-      <dependency>
-        <groupId>org.powermock</groupId>
-        <artifactId>powermock-reflect</artifactId>
-        <version>${powermock.version}</version>
-      </dependency>
-
       <dependency>
         <groupId>org.apache.zookeeper</groupId>
         <artifactId>zookeeper</artifactId>
@@ -1331,12 +1324,6 @@ flexible messaging model and an intuitive client API.</description>
       <scope>test</scope>
     </dependency>
 
-    <dependency>
-      <groupId>org.powermock</groupId>
-      <artifactId>powermock-reflect</artifactId>
-      <scope>test</scope>
-    </dependency>
-
     <dependency>
       <groupId>org.assertj</groupId>
       <artifactId>assertj-core</artifactId>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index 67e986b804c..9c2030997fb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -106,7 +106,7 @@ public class OwnershipCache {
                                 .thenRun(() -> {
                                     log.info("Resource lock for {} has expired", rl.getPath());
                                     namespaceService.unloadNamespaceBundle(namespaceBundle);
-                                    ownedBundlesCache.synchronous().invalidate(namespaceBundle);
+                                    invalidateLocalOwnerCache(namespaceBundle);
                                     namespaceService.onNamespaceBundleUnload(namespaceBundle);
                                 });
                         return new OwnedBundle(namespaceBundle);
@@ -330,6 +330,10 @@ public class OwnershipCache {
         this.ownedBundlesCache.synchronous().invalidateAll();
     }
 
+    public void invalidateLocalOwnerCache(NamespaceBundle namespaceBundle) {
+        this.ownedBundlesCache.synchronous().invalidate(namespaceBundle);
+    }
+
     public synchronized boolean refreshSelfOwnerInfo() {
         this.selfOwnerInfo = new NamespaceEphemeralData(pulsar.getBrokerServiceUrl(),
                 pulsar.getBrokerServiceUrlTls(), pulsar.getSafeWebServiceAddress(),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 12359d5c413..e5d6251d177 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1281,6 +1281,11 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
         return cursor;
     }
 
+    @VisibleForTesting
+    public PendingAckHandle getPendingAckHandle() {
+        return pendingAckHandle;
+    }
+
     public void syncBatchPositionBitSetForPendingAck(PositionImpl position) {
         this.pendingAckHandle.syncBatchPositionAckSetForTransaction(position);
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 0893ecf9ea8..484120cce24 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -382,6 +382,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         }
     }
 
+    @VisibleForTesting
+    public AtomicLong getPendingWriteOps() {
+        return pendingWriteOps;
+    }
+
     private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor,
             boolean replicated, Map<String, String> subscriptionProperties) {
         checkNotNull(compactedTopic);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 283bc038d76..a565a20b664 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.transaction.pendingack.impl;
 import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet;
 import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWithAckSet;
 import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap;
+import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -1027,6 +1028,11 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
         }
     }
 
+    @VisibleForTesting
+    public Map<PositionImpl, MutablePair<PositionImpl, Integer>> getIndividualAckPositions() {
+        return individualAckPositions;
+    }
+
     @Override
     public boolean checkIfPendingAckStoreInit() {
         return this.pendingAckStoreFuture != null && this.pendingAckStoreFuture.isDone();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java
index e26b0aa7561..b9c32e91e4c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java
@@ -37,10 +37,10 @@ import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.CachedDNSToSwitchMapping;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
-import org.powermock.reflect.Whitebox;
 import org.testng.annotations.Test;
 
 /**
@@ -281,7 +281,7 @@ public class BookKeeperClientFactoryImplTest {
     }
 
     @Test
-    public void testBookKeeperIoThreadsConfiguration() {
+    public void testBookKeeperIoThreadsConfiguration() throws Exception {
         BookKeeperClientFactoryImpl factory = new BookKeeperClientFactoryImpl();
         ServiceConfiguration conf = new ServiceConfiguration();
         assertEquals(factory.createBkClientConfiguration(mock(MetadataStoreExtended.class), conf)
@@ -292,11 +292,11 @@ public class BookKeeperClientFactoryImplTest {
         EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
         BookKeeper.Builder builder = factory.getBookKeeperBuilder(conf, eventLoopGroup,
                 mock(StatsLogger.class), mock(ClientConfiguration.class));
-        assertEquals(Whitebox.getInternalState(builder, "eventLoopGroup"), eventLoopGroup);
+        assertEquals(FieldUtils.readField(builder, "eventLoopGroup", true), eventLoopGroup);
         conf.setBookkeeperClientSeparatedIoThreadsEnabled(true);
         builder = factory.getBookKeeperBuilder(conf, eventLoopGroup,
                 mock(StatsLogger.class), mock(ClientConfiguration.class));
-        assertNull(Whitebox.getInternalState(builder, "eventLoopGroup"));
+        assertNull(FieldUtils.readField(builder, "eventLoopGroup", true));
     }
 
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceCloseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceCloseTest.java
index c424132855b..1fbb40a6a56 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceCloseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceCloseTest.java
@@ -24,9 +24,9 @@ import static org.testng.AssertJUnit.assertTrue;
 
 import java.util.concurrent.ScheduledFuture;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
-import org.awaitility.reflect.WhiteboxImpl;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -62,18 +62,21 @@ public class PulsarServiceCloseTest extends MockedPulsarServiceBaseTest {
     @Test(timeOut = 30_000)
     public void closeInTimeTest() throws Exception {
         LoadSheddingTask task = pulsar.getLoadSheddingTask();
-        boolean isCancel = WhiteboxImpl.getInternalState(task, "isCancel");
-        assertFalse(isCancel);
-        ScheduledFuture<?> loadSheddingFuture = WhiteboxImpl.getInternalState(task, "future");
-        assertFalse(loadSheddingFuture.isCancelled());
+
+        {
+            assertFalse((boolean) FieldUtils.readField(task, "isCancel", true));
+            ScheduledFuture<?> loadSheddingFuture = (ScheduledFuture<?>) FieldUtils.readField(task, "future", true);
+            assertFalse(loadSheddingFuture.isCancelled());
+        }
 
         // The pulsar service is not used, so it should be closed gracefully in short time.
         pulsar.close();
 
-        isCancel = WhiteboxImpl.getInternalState(task, "isCancel");
-        assertTrue(isCancel);
-        loadSheddingFuture = WhiteboxImpl.getInternalState(task, "future");
-        assertTrue(loadSheddingFuture.isCancelled());
+        {
+            assertTrue((boolean) FieldUtils.readField(task, "isCancel", true));
+            ScheduledFuture<?> loadSheddingFuture = (ScheduledFuture<?>) FieldUtils.readField(task, "future", true);
+            assertTrue(loadSheddingFuture.isCancelled());
+        }
     }
 
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java
index 2a97bc4f8f2..d9199586181 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java
@@ -22,7 +22,6 @@ import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
-import java.lang.reflect.Field;
 import java.util.Collection;
 import java.util.Date;
 import java.util.Map;
@@ -32,13 +31,11 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.TimeoutHandler;
-import javax.ws.rs.core.UriInfo;
 import org.apache.pulsar.broker.admin.v2.PersistentTopics;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
@@ -51,26 +48,16 @@ import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-admin")
 public class AdminApiGetLastMessageIdTest extends MockedPulsarServiceBaseTest {
 
-    private PersistentTopics persistentTopics;
-    private final String testTenant = "my-tenant";
-    private final String testLocalCluster = "use";
-    private final String testNamespace = "my-namespace";
-    protected Field uriField;
-    protected UriInfo uriInfo;
+    private static final String testTenant = "my-tenant";
+    private static final String testNamespace = "my-namespace";
 
-    @BeforeClass
-    public void initPersistentTopics() throws Exception {
-        uriField = PulsarWebResource.class.getDeclaredField("uri");
-        uriField.setAccessible(true);
-        uriInfo = mock(UriInfo.class);
-    }
+    private PersistentTopics persistentTopics;
 
     @Override
     @BeforeMethod
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index de555c1715d..3a9bd21245b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.admin;
 
-import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -94,7 +93,6 @@ import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.zookeeper.KeeperException;
 import org.awaitility.Awaitility;
 import org.mockito.ArgumentCaptor;
-import org.powermock.reflect.Whitebox;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
@@ -149,9 +147,8 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
 
         PulsarResources resources =
                 spy(new PulsarResources(pulsar.getLocalMetadataStore(), pulsar.getConfigurationMetadataStore()));
-        doReturn(spyWithClassAndConstructorArgs(TopicResources.class, pulsar.getLocalMetadataStore())).when(resources)
-                .getTopicResources();
-        Whitebox.setInternalState(pulsar, "pulsarResources", resources);
+        doReturn(spy(new TopicResources(pulsar.getLocalMetadataStore()))).when(resources).getTopicResources();
+        doReturn(resources).when(pulsar).getPulsarResources();
 
         admin.clusters().createCluster("use", ClusterData.builder().serviceUrl("http://broker-use.com:8080").build());
         admin.clusters().createCluster("test", ClusterData.builder().serviceUrl("http://broker-use.com:8080").build());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
index 7bd15992f64..e8c0683569f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
@@ -38,9 +38,9 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
-import org.powermock.reflect.Whitebox;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -122,15 +122,14 @@ public class TopicAutoCreationTest extends ProducerConsumerBase {
         });
 
 
-        LookupService original = Whitebox.getInternalState(pulsarClient, "lookup");
+        LookupService original = ((PulsarClientImpl) pulsarClient).getLookup();
         try {
 
             // we want to skip the "lookup" phase, because it is blocked by the HTTP API
             LookupService mockLookup = mock(LookupService.class);
-            Whitebox.setInternalState(pulsarClient, "lookup", mockLookup);
-            when(mockLookup.getPartitionedTopicMetadata(any())).thenAnswer(i -> {
-                return CompletableFuture.completedFuture(new PartitionedTopicMetadata(0));
-            });
+            ((PulsarClientImpl) pulsarClient).setLookup(mockLookup);
+            when(mockLookup.getPartitionedTopicMetadata(any())).thenAnswer(
+                    i -> CompletableFuture.completedFuture(new PartitionedTopicMetadata(0)));
             when(mockLookup.getBroker(any())).thenAnswer(i -> {
                 InetSocketAddress brokerAddress =
                         new InetSocketAddress(pulsar.getAdvertisedAddress(), pulsar.getBrokerListenPort().get());
@@ -139,20 +138,20 @@ public class TopicAutoCreationTest extends ProducerConsumerBase {
 
             // Creating a producer and creating a Consumer may trigger automatic topic
             // creation, let's try to create a Producer and a Consumer
-            try (Producer<byte[]> producer = pulsarClient.newProducer()
+            try (Producer<byte[]> ignored = pulsarClient.newProducer()
                     .sendTimeout(1, TimeUnit.SECONDS)
                     .topic(topic)
-                    .create();) {
+                    .create()) {
             } catch (PulsarClientException.LookupException expected) {
                 String msg = "Namespace bundle for topic (%s) not served by this instance";
                 log.info("Expected error", expected);
                 assertTrue(expected.getMessage().contains(String.format(msg, topic)));
             }
 
-            try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
+            try (Consumer<byte[]> ignored = pulsarClient.newConsumer()
                     .topic(topic)
                     .subscriptionName("test")
-                    .subscribe();) {
+                    .subscribe()) {
             } catch (PulsarClientException.LookupException expected) {
                 String msg = "Namespace bundle for topic (%s) not served by this instance";
                 log.info("Expected error", expected);
@@ -170,17 +169,16 @@ public class TopicAutoCreationTest extends ProducerConsumerBase {
             admin.topics().getList(namespaceName).isEmpty();
 
             // create now the topic using auto creation
-            Whitebox.setInternalState(pulsarClient, "lookup", original);
-
-            try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
+            ((PulsarClientImpl) pulsarClient).setLookup(original);
+            try (Consumer<byte[]> ignored = pulsarClient.newConsumer()
                     .topic(topic)
                     .subscriptionName("test")
-                    .subscribe();) {
+                    .subscribe()) {
             }
 
             admin.topics().getList(namespaceName).contains(topic);
         } finally {
-            Whitebox.setInternalState(pulsarClient, "lookup", original);
+            ((PulsarClientImpl) pulsarClient).setLookup(original);
         }
 
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
index e7dfff6de15..49ead1c5fcb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
@@ -49,6 +49,7 @@ import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.io.JsonEncoder;
 import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.avro.util.Utf8;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
@@ -88,7 +89,6 @@ import org.apache.pulsar.websocket.data.ProducerMessages;
 import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-import org.powermock.reflect.Whitebox;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -320,7 +320,7 @@ public class TopicsTest extends MockedPulsarServiceBaseTest {
         doReturn(false).when(topics).isRequestHttps();
         UriInfo uriInfo = mock(UriInfo.class);
         doReturn(requestPath).when(uriInfo).getRequestUri();
-        Whitebox.setInternalState(topics, "uri", uriInfo);
+        FieldUtils.writeField(topics, "uri", uriInfo, true);
         //do produce on another broker
         topics.setPulsar(pulsar2);
         AsyncResponse asyncResponse = mock(AsyncResponse.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
index 8be88350ea6..5d62ec2c58c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.loadbalance;
 
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -26,7 +25,6 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import java.lang.reflect.Field;
-import java.lang.reflect.Method;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -35,13 +33,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import lombok.SneakyThrows;
 import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.commons.lang3.reflect.MethodUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.impl.PulsarResourceDescription;
@@ -68,7 +70,8 @@ import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
 import org.awaitility.Awaitility;
-import org.powermock.reflect.Whitebox;
+import org.mockito.MockedConstruction;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -92,12 +95,12 @@ public class LoadBalancerTest {
     private static final int MAX_RETRIES = 15;
 
     private static final int BROKER_COUNT = 5;
-    private int[] brokerWebServicePorts = new int[BROKER_COUNT];
-    private int[] brokerNativeBrokerPorts = new int[BROKER_COUNT];
-    private URL[] brokerUrls = new URL[BROKER_COUNT];
-    private String[] lookupAddresses = new String[BROKER_COUNT];
-    private PulsarService[] pulsarServices = new PulsarService[BROKER_COUNT];
-    private PulsarAdmin[] pulsarAdmins = new PulsarAdmin[BROKER_COUNT];
+    private final int[] brokerWebServicePorts = new int[BROKER_COUNT];
+    private final int[] brokerNativeBrokerPorts = new int[BROKER_COUNT];
+    private final URL[] brokerUrls = new URL[BROKER_COUNT];
+    private final String[] lookupAddresses = new String[BROKER_COUNT];
+    private final PulsarService[] pulsarServices = new PulsarService[BROKER_COUNT];
+    private final PulsarAdmin[] pulsarAdmins = new PulsarAdmin[BROKER_COUNT];
 
     @BeforeMethod
     void setup() throws Exception {
@@ -222,7 +225,7 @@ public class LoadBalancerTest {
     }
 
     /*
-     * tests rankings get updated when we write write the new load reports to the zookeeper on loadbalance root node
+     * tests rankings get updated when we write the new load reports to the zookeeper on load-balance root node
      * tests writing pre-configured load report on the zookeeper translates the pre-calculated rankings
      */
     @Test
@@ -237,18 +240,15 @@ public class LoadBalancerTest {
             sru.setCpu(new ResourceUsage(5, 400));
             lr.setSystemResourceUsage(sru);
 
-            Whitebox.setInternalState(pulsarServices[0].getLoadManager().get(), "lastLoadReport", lr);
-            ResourceLock<LoadReport> lock = Whitebox.getInternalState(pulsarServices[i].getLoadManager().get(),
-                    "brokerLock");
-            lock.updateValue(lr).join();
+            FieldUtils.writeField(pulsarServices[0].getLoadManager().get(), "lastLoadReport", lr, true);
+            updateLastReport(pulsarServices[i].getLoadManager().get(), lr);
         }
 
         for (int i = 0; i < BROKER_COUNT; i++) {
-            Method updateRanking = Whitebox.getMethod(SimpleLoadManagerImpl.class, "updateRanking");
-            updateRanking.invoke(pulsarServices[0].getLoadManager().get());
+            MethodUtils.invokeMethod(pulsarServices[0].getLoadManager().get(), true, "updateRanking");
         }
 
-        // do lookup for bunch of bundles
+        // do lookup for a bunch of bundles
         int totalNamespaces = 200;
         Map<String, Integer> namespaceOwner = new HashMap<>();
         for (int i = 0; i < totalNamespaces; i++) {
@@ -275,6 +275,13 @@ public class LoadBalancerTest {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    @SneakyThrows
+    private void updateLastReport(LoadManager lm, LoadReport lr){
+        ResourceLock<LoadReport> lock = (ResourceLock<LoadReport>) FieldUtils.readField(lm, "brokerLock", true);
+        lock.updateValue(lr).join();
+    }
+
     private AtomicReference<Map<Long, Set<ResourceUnit>>> getSortedRanking(PulsarService pulsar)
             throws NoSuchFieldException, IllegalAccessException {
         Field ranking = ((SimpleLoadManagerImpl) pulsar.getLoadManager().get()).getClass()
@@ -312,17 +319,14 @@ public class LoadBalancerTest {
             sru.setCpu(new ResourceUsage(60, 400));
             lr.setSystemResourceUsage(sru);
 
-            ResourceLock<LoadReport> lock = Whitebox.getInternalState(pulsarServices[i].getLoadManager().get(),
-                    "brokerLock");
-            lock.updateValue(lr).join();
+            updateLastReport(pulsarServices[i].getLoadManager().get(), lr);
         }
 
         for (int i = 0; i < BROKER_COUNT; i++) {
-            Method method = Whitebox.getMethod(SimpleLoadManagerImpl.class, "getUpdateRankingHandle");
             LoadManager loadManager = pulsarServices[i].getLoadManager().get();
             Awaitility.await().until(() -> {
-                Object invoke = method.invoke(loadManager);
-                return invoke != null && ((Future) invoke).isDone();
+                Future<?> f = ((SimpleLoadManagerImpl) loadManager).getUpdateRankingHandle();
+                return f != null && f.isDone();
             });
         }
 
@@ -376,15 +380,12 @@ public class LoadBalancerTest {
             }
             lr.setBundleStats(bundleStats);
 
-            Whitebox.setInternalState(pulsarServices[0].getLoadManager().get(), "lastLoadReport", lr);
-            ResourceLock<LoadReport> lock = Whitebox.getInternalState(pulsarServices[i].getLoadManager().get(),
-                    "brokerLock");
-            lock.updateValue(lr).join();
+            FieldUtils.writeField(pulsarServices[0].getLoadManager().get(), "lastLoadReport", lr, true);
+            updateLastReport(pulsarServices[i].getLoadManager().get(), lr);
         }
 
         for (int i = 0; i < BROKER_COUNT; i++) {
-            Method updateRanking = Whitebox.getMethod(SimpleLoadManagerImpl.class, "updateRanking");
-            updateRanking.invoke(pulsarServices[0].getLoadManager().get());
+            MethodUtils.invokeMethod(pulsarServices[0].getLoadManager().get(), true, "updateRanking");
         }
 
         // print ranking
@@ -469,10 +470,7 @@ public class LoadBalancerTest {
                 bundleStats.put(bundleName, stats);
             }
             lr.setBundleStats(bundleStats);
-
-            ResourceLock<LoadReport> lock = Whitebox.getInternalState(pulsarServices[i].getLoadManager().get(),
-                    "brokerLock");
-            lock.updateValue(lr).join();
+            updateLastReport(pulsarServices[i].getLoadManager().get(), lr);
         }
     }
 
@@ -597,8 +595,12 @@ public class LoadBalancerTest {
         }
 
         // fake Namespaces Admin
-        NamespacesImpl namespaceAdmin = mock(NamespacesImpl.class);
-        Whitebox.setInternalState(pulsarServices[0].getAdminClient(), "namespaces", namespaceAdmin);
+        CompletableFuture<NamespacesImpl> namespaceAdminFuture = new CompletableFuture<>();
+        try (MockedConstruction<NamespacesImpl> ignore = Mockito.mockConstruction(
+                NamespacesImpl.class, (allocator, context) -> namespaceAdminFuture.complete(allocator))) {
+            pulsarServices[0].getAdminClient();
+        }
+        NamespacesImpl namespaceAdmin = namespaceAdminFuture.get();
 
         // create load report
         // namespace 01~09 need to be split
@@ -631,10 +633,8 @@ public class LoadBalancerTest {
                 newBundleStats(maxTopics + 1, 0, 0, 0, 0, 0, 0));
         lr.setBundleStats(bundleStats);
 
-        Whitebox.setInternalState(pulsarServices[0].getLoadManager().get(), "lastLoadReport", lr);
-        ResourceLock<LoadReport> lock = Whitebox.getInternalState(pulsarServices[0].getLoadManager().get(),
-                "brokerLock");
-        lock.updateValue(lr).join();
+        FieldUtils.writeField(pulsarServices[0].getLoadManager().get(), "lastLoadReport", lr, true);
+        updateLastReport(pulsarServices[0].getLoadManager().get(), lr);
 
         // sleep to wait load ranking be triggered and trigger bundle split
         Thread.sleep(5000);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index a889bedb7c1..9b7ea315f59 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -85,7 +85,6 @@ import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.awaitility.Awaitility;
 import org.mockito.Mockito;
-import org.powermock.reflect.Whitebox;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -450,20 +449,16 @@ public class ModularLoadManagerImplTest {
 
     /**
      * It verifies that deletion of broker-znode on broker-stop will invalidate availableBrokerCache list
-     *
-     * @throws Exception
      */
     @Test
     public void testBrokerStopCacheUpdate() throws Exception {
         ModularLoadManagerWrapper loadManagerWrapper = (ModularLoadManagerWrapper) pulsar1.getLoadManager().get();
-        ModularLoadManagerImpl lm = Whitebox.getInternalState(loadManagerWrapper, "loadManager");
+        ModularLoadManagerImpl lm = (ModularLoadManagerImpl) loadManagerWrapper.getLoadManager();
         assertEquals(lm.getAvailableBrokers().size(), 2);
 
         pulsar2.close();
 
-        Awaitility.await().untilAsserted(() -> {
-            assertEquals(lm.getAvailableBrokers().size(), 1);
-        });
+        Awaitility.await().untilAsserted(() -> assertEquals(lm.getAvailableBrokers().size(), 1));
     }
 
     /**
@@ -481,8 +476,6 @@ public class ModularLoadManagerImplTest {
      *     b. available-brokers: broker2, broker3          => result: broker2
      *     c. available-brokers: broker3                   => result: NULL
      * </pre>
-     *
-     * @throws Exception
      */
     @Test
     public void testNamespaceIsolationPoliciesForPrimaryAndSecondaryBrokers() throws Exception {
@@ -664,27 +657,20 @@ public class ModularLoadManagerImplTest {
     @Test
     public void testRemoveDeadBrokerTimeAverageData() throws Exception {
         ModularLoadManagerWrapper loadManagerWrapper = (ModularLoadManagerWrapper) pulsar1.getLoadManager().get();
-        ModularLoadManagerImpl lm = Whitebox.getInternalState(loadManagerWrapper, "loadManager");
+        ModularLoadManagerImpl lm = (ModularLoadManagerImpl) loadManagerWrapper.getLoadManager();
         assertEquals(lm.getAvailableBrokers().size(), 2);
 
         pulsar2.close();
 
-        Awaitility.await().untilAsserted(() -> {
-            assertEquals(lm.getAvailableBrokers().size(), 1);
-        });
+        Awaitility.await().untilAsserted(() -> assertEquals(lm.getAvailableBrokers().size(), 1));
         lm.updateAll();
 
         List<String> data =  pulsar1.getLocalMetadataStore()
-                .getMetadataCache(TimeAverageBrokerData.class).getChildren(TIME_AVERAGE_BROKER_ZPATH).join();
-
-        Awaitility.await().untilAsserted(() -> {
-            assertTrue(pulsar1.getLeaderElectionService().isLeader());
-        });
+                .getMetadataCache(TimeAverageBrokerData.class)
+                .getChildren(TIME_AVERAGE_BROKER_ZPATH)
+                .join();
 
+        Awaitility.await().untilAsserted(() -> assertTrue(pulsar1.getLeaderElectionService().isLeader()));
         assertEquals(data.size(), 1);
-
-
-
-
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 6f5462c1cea..cb806d3ccfe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.loadbalance.LoadData;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
@@ -87,7 +88,6 @@ import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.policies.data.loadbalancer.BundleData;
 import org.awaitility.Awaitility;
 import org.mockito.stubbing.Answer;
-import org.powermock.reflect.Whitebox;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -822,11 +822,10 @@ public class NamespaceServiceTest extends BrokerTestBase {
         // Wait until "ModularLoadManager" completes processing the ZK notification.
         ModularLoadManagerWrapper modularLoadManagerWrapper = (ModularLoadManagerWrapper) loadManager;
         ModularLoadManagerImpl modularLoadManager = (ModularLoadManagerImpl) modularLoadManagerWrapper.getLoadManager();
-        ScheduledExecutorService scheduler = Whitebox.getInternalState(modularLoadManager, "scheduler");
+        ScheduledExecutorService scheduler = (ScheduledExecutorService) FieldUtils.readField(
+                modularLoadManager, "scheduler", true);
         CompletableFuture<Void> waitForNoticeHandleFinishByLoadManager = new CompletableFuture<>();
-        scheduler.execute(() -> {
-            waitForNoticeHandleFinishByLoadManager.complete(null);
-        });
+        scheduler.execute(() -> waitForNoticeHandleFinishByLoadManager.complete(null));
         waitForNoticeHandleFinishByLoadManager.join();
         // Manually trigger "LoadResourceQuotaUpdaterTask"
         loadManager.writeResourceQuotasToZooKeeper();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
index 604abd8d709..124372ea15a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
@@ -39,7 +39,6 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.awaitility.Awaitility;
-import org.powermock.reflect.Whitebox;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -294,17 +293,13 @@ public class ExclusiveProducerTest extends BrokerTestBase {
         // Simulate a producer that takes over and fences p1 through the topic epoch
         if (!partitioned) {
             Topic t = pulsar.getBrokerService().getTopic(topic, false).get().get();
-            CompletableFuture<?> f = (CompletableFuture<?>) Whitebox
-                    .getMethod(AbstractTopic.class, "incrementTopicEpoch", Optional.class)
-                    .invoke(t, Optional.of(0L));
+            CompletableFuture<?> f = ((AbstractTopic) t).incrementTopicEpoch(Optional.of(0L));
             f.get();
         } else {
             for (int i = 0; i < 3; i++) {
                 String name = TopicName.get(topic).getPartition(i).toString();
                 Topic t = pulsar.getBrokerService().getTopic(name, false).get().get();
-                CompletableFuture<?> f = (CompletableFuture<?>) Whitebox
-                        .getMethod(AbstractTopic.class, "incrementTopicEpoch", Optional.class)
-                        .invoke(t, Optional.of(0L));
+                CompletableFuture<?> f = ((AbstractTopic) t).incrementTopicEpoch(Optional.of(0L));
                 f.get();
             }
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
index d8dadaf8b5b..d8485f3f051 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
@@ -19,11 +19,8 @@
 package org.apache.pulsar.broker.service;
 
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
-
-import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.google.common.collect.Sets;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -31,15 +28,14 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import lombok.Cleanup;
-import org.apache.commons.lang3.mutable.MutableBoolean;
+import lombok.SneakyThrows;
 import org.apache.commons.lang3.mutable.MutableObject;
-import org.apache.jute.Record;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.lookup.LookupResult;
 import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.namespace.OwnedBundle;
 import org.apache.pulsar.broker.namespace.OwnershipCache;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
@@ -48,16 +44,8 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
-import org.apache.pulsar.metadata.api.extended.SessionEvent;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.proto.ReplyHeader;
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.server.ZooKeeperServer;
 import org.mockito.stubbing.Answer;
-import org.powermock.reflect.Whitebox;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -136,6 +124,7 @@ public class TopicOwnerTest {
     }
 
     @SuppressWarnings("unchecked")
+    @SneakyThrows(IllegalAccessException.class)
     private MutableObject<PulsarService> spyLeaderNamespaceServiceForAuthorizedBroker() {
         // Spy leader namespace service to inject authorized broker for namespace-bundle from leader,
         // this is a safe operation since it is just an recommendation if namespace-bundle has no owner
@@ -157,85 +146,10 @@ public class TopicOwnerTest {
             return CompletableFuture.completedFuture(Optional.of(lookupResult));
         };
         doAnswer(answer).when(spyLeaderNamespaceService).getBrokerServiceUrlAsync(any(TopicName.class), any(LookupOptions.class));
-        Whitebox.setInternalState(leaderPulsar, "nsService", spyLeaderNamespaceService);
+        FieldUtils.writeField(leaderPulsar, "nsService", spyLeaderNamespaceService, true);
         return leaderAuthorizedBroker;
     }
 
-    private CompletableFuture<Void> watchMetadataStoreReconnect(MetadataStoreExtended store) {
-        CompletableFuture<Void> reconnectedFuture = new CompletableFuture<>();
-        store.registerSessionListener(event -> {
-            if (event == SessionEvent.Reconnected || event == SessionEvent.SessionReestablished) {
-                reconnectedFuture.complete(null);
-            }
-        });
-
-        return reconnectedFuture;
-    }
-
-    @FunctionalInterface
-    interface RequestMatcher {
-        boolean match(Request request) throws Exception;
-    }
-
-    private void spyZookeeperToDisconnectBeforePersist(ZooKeeper zooKeeper, RequestMatcher matcher) {
-        ZooKeeperServer zooKeeperServer = bkEnsemble.getZkServer();
-        ServerCnxn zkServerConnection = bkEnsemble.getZookeeperServerConnection(zooKeeper);
-        ZooKeeperServer spyZooKeeperServer = spy(zooKeeperServer);
-
-        // Spy zk server connection to close connection to cause connection loss after namespace-bundle
-        // deleted successfully. This mimics crash of connected zk server after committing requested operation.
-        Whitebox.setInternalState(zkServerConnection, "zkServer", spyZooKeeperServer);
-        doAnswer(invocation -> {
-            Request request = invocation.getArgument(0);
-            if (request.sessionId != zooKeeper.getSessionId()) {
-                return invocation.callRealMethod();
-            }
-            if (!matcher.match(request)) {
-                return invocation.callRealMethod();
-            }
-            Whitebox.setInternalState(zkServerConnection, "zkServer", zooKeeperServer);
-            bkEnsemble.disconnectZookeeper(zooKeeper);
-            return null;
-        }).when(spyZooKeeperServer).submitRequest(any(Request.class));
-    }
-
-    private void spyZookeeperToDisconnectAfterPersist(ZooKeeper zooKeeper, RequestMatcher matcher) {
-        ZooKeeperServer zooKeeperServer = bkEnsemble.getZkServer();
-        ServerCnxn zkServerConnection = bkEnsemble.getZookeeperServerConnection(zooKeeper);
-        ZooKeeperServer spyZooKeeperServer = spy(zooKeeperServer);
-
-        // Spy zk server connection to close connection to cause connection loss after namespace-bundle
-        // deleted successfully. This mimics crash of connected zk server after committing requested operation.
-        Whitebox.setInternalState(zkServerConnection, "zkServer", spyZooKeeperServer);
-        MutableBoolean disconnected = new MutableBoolean();
-        doAnswer(invocation -> {
-            Request request = invocation.getArgument(0);
-            if (request.sessionId != zooKeeper.getSessionId()) {
-                return invocation.callRealMethod();
-            }
-            if (!matcher.match(request)) {
-                return invocation.callRealMethod();
-            }
-
-            ServerCnxn spyZkServerConnection1 = spy(zkServerConnection);
-            doAnswer(responseInvocation -> {
-                synchronized (disconnected) {
-                    ReplyHeader replyHeader = responseInvocation.getArgument(0);
-                    if (replyHeader.getXid() == request.cxid && replyHeader.getErr() == 0) {
-                        Whitebox.setInternalState(zkServerConnection, "zkServer", zooKeeperServer);
-                        disconnected.setTrue();
-                        bkEnsemble.disconnectZookeeper(zooKeeper);
-                    } else if (disconnected.isFalse()) {
-                        return responseInvocation.callRealMethod();
-                    }
-                    return null;
-                }
-            }).when(spyZkServerConnection1).sendResponse(any(ReplyHeader.class), nullable(Record.class), any(String.class));
-            Whitebox.setInternalState(request, "cnxn", spyZkServerConnection1);
-            return invocation.callRealMethod();
-        }).when(spyZooKeeperServer).submitRequest(any(Request.class));
-    }
-
     @Test
     public void testReestablishOwnershipAfterInvalidateCache() throws Exception {
         String topic1 = "persistent://my-tenant/my-ns/topic-1";
@@ -254,12 +168,11 @@ public class TopicOwnerTest {
         Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());
 
         OwnershipCache ownershipCache1 = pulsar1.getNamespaceService().getOwnershipCache();
-        AsyncLoadingCache<NamespaceBundle, OwnedBundle> ownedBundlesCache1 = Whitebox.getInternalState(ownershipCache1, "ownedBundlesCache");
 
         leaderAuthorizedBroker.setValue(null);
 
         Assert.assertNotNull(ownershipCache1.getOwnedBundle(namespaceBundle));
-        ownedBundlesCache1.synchronous().invalidate(namespaceBundle);
+        ownershipCache1.invalidateLocalOwnerCache(namespaceBundle);
         Assert.assertNull(ownershipCache1.getOwnedBundle(namespaceBundle));
 
         // pulsar1 is still owner in zk.
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
index 72435aa265e..6ad17b7d27e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
@@ -27,6 +27,7 @@ import lombok.Cleanup;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
@@ -41,7 +42,6 @@ import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.PulsarTestClient;
 import org.apache.pulsar.common.stats.Metrics;
 import org.awaitility.Awaitility;
-import org.powermock.reflect.Whitebox;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -171,7 +171,8 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
             producer.send(message.getBytes());
             consumer.acknowledge(consumer.receive().getMessageId());
             // Make BK error.
-            LedgerHandle ledgerHandle = Whitebox.getInternalState(managedCursor, "cursorLedger");
+            LedgerHandle ledgerHandle = (LedgerHandle) FieldUtils.readField(
+                    managedCursor, "cursorLedger", true);
             ledgerHandle.close();
             return managedCursorMXBean.getPersistLedgerErrors() > 0
                     && managedCursorMXBean.getPersistZookeeperSucceed() > 0;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index 4e03f839213..b13c8ff2cb2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -19,12 +19,19 @@
 package org.apache.pulsar.broker.systopic;
 
 import com.google.common.collect.Sets;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
 import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang3.reflect.MethodUtils;
 import org.apache.pulsar.broker.admin.impl.BrokersBase;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
@@ -33,6 +40,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.ListTopicsOptions;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
@@ -40,26 +48,18 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
-import org.apache.pulsar.common.policies.data.BacklogQuota;
-import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.TopicVersion;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.mockito.Mockito;
-import org.powermock.reflect.Whitebox;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 
 @Test(groups = "broker")
 public class PartitionedSystemTopicTest extends BrokerTestBase {
@@ -207,7 +207,10 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
         config.setMinimumRolloverTime(1, TimeUnit.SECONDS);
         config.setMaximumRolloverTime(1, TimeUnit.SECONDS);
         persistentTopic.getManagedLedger().setConfig(config);
-        Whitebox.invokeMethod(persistentTopic.getManagedLedger(), "updateLastLedgerCreatedTimeAndScheduleRolloverTask");
+        MethodUtils.invokeMethod(
+                persistentTopic.getManagedLedger(),
+                true,
+                "updateLastLedgerCreatedTimeAndScheduleRolloverTask");
         String msg1 = "msg-1";
         producer.send(msg1);
         Thread.sleep(3 * 1000);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index d0674721c00..ffc351e9413 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -60,6 +60,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
+import lombok.Lombok;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.util.Bytes;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -71,6 +72,8 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.commons.lang3.reflect.MethodUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
@@ -138,7 +141,6 @@ import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig
 import org.awaitility.Awaitility;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-import org.powermock.reflect.Whitebox;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -1244,8 +1246,8 @@ public class TransactionTest extends TransactionTestBase {
         for (int i = 0; i < 10; i++) {
             producer.newMessage().value(Bytes.toBytes(i)).send();
         }
-        ClientCnx cnx = Whitebox.invokeMethod(consumer, "cnx");
-        Whitebox.invokeMethod(consumer, "connectionClosed", cnx);
+        ClientCnx cnx = (ClientCnx) MethodUtils.invokeMethod(consumer, true, "cnx");
+        MethodUtils.invokeMethod(consumer, true, "connectionClosed", cnx);
 
         Message<byte[]> message = consumer.receive();
         Transaction transaction = pulsarClient
@@ -1391,39 +1393,37 @@ public class TransactionTest extends TransactionTestBase {
      */
     @Test
     public void testTBRecoverChangeStateError() throws InterruptedException, TimeoutException {
-        final AtomicReference<PersistentTopic> persistentTopic = new AtomicReference<PersistentTopic>();
-        AtomicInteger atomicInteger = new AtomicInteger(1);
+        final AtomicReference<PersistentTopic> persistentTopic = new AtomicReference<>();
         // Create Executor
-        ScheduledExecutorService executorService_recover = mock(ScheduledExecutorService.class);
+        ScheduledExecutorService executorServiceRecover = mock(ScheduledExecutorService.class);
         // Mock serviceConfiguration.
         ServiceConfiguration serviceConfiguration = mock(ServiceConfiguration.class);
         when(serviceConfiguration.isEnableReplicatedSubscriptions()).thenReturn(false);
         when(serviceConfiguration.isTransactionCoordinatorEnabled()).thenReturn(true);
         // Mock executorProvider.
         ExecutorProvider executorProvider = mock(ExecutorProvider.class);
-        when(executorProvider.getExecutor(any(Object.class))).thenReturn(executorService_recover);
+        when(executorProvider.getExecutor(any(Object.class))).thenReturn(executorServiceRecover);
         // Mock pendingAckStore.
         PendingAckStore pendingAckStore = mock(PendingAckStore.class);
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable {
-                new Thread(() -> {
-                    TopicTransactionBuffer.TopicTransactionBufferRecover recover
-                            = (TopicTransactionBuffer.TopicTransactionBufferRecover)invocation.getArguments()[0];
-                    TopicTransactionBufferRecoverCallBack callBack
-                            = Whitebox.getInternalState(recover, "callBack");;
-                    try {
-                        persistentTopic.get().getTransactionBuffer().closeAsync().get();
-                    } catch (InterruptedException e) {
-                        throw new RuntimeException(e);
-                    } catch (ExecutionException e) {
-                        throw new RuntimeException(e);
+        doAnswer(invocation -> {
+            new Thread(() -> {
+                TopicTransactionBuffer.TopicTransactionBufferRecover recover
+                        = (TopicTransactionBuffer.TopicTransactionBufferRecover) invocation.getArguments()[0];
+                TopicTransactionBufferRecoverCallBack callBack = null;
+                try {
+                    callBack = (TopicTransactionBufferRecoverCallBack) FieldUtils.readField(
+                            recover, "callBack", true);
+                    persistentTopic.get().getTransactionBuffer().closeAsync().get();
+                } catch (Exception e) {
+                    throw Lombok.sneakyThrow(e);
+                } finally {
+                    if (callBack != null) {
+                        callBack.recoverComplete();
                     }
-                    callBack.recoverComplete();
-                }).start();
-                return null;
-            }
-        }).when(executorService_recover).execute(any());
+                }
+            }).start();
+            return null;
+        }).when(executorServiceRecover).execute(any());
         // Mock executorProvider.
         TransactionPendingAckStoreProvider pendingAckStoreProvider = mock(TransactionPendingAckStoreProvider.class);
         when(pendingAckStoreProvider.checkInitializedBefore(any()))
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index 576ef647248..ba7b8b6b40f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.transaction.buffer;
 
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import org.apache.pulsar.client.api.Producer;
@@ -29,7 +30,6 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.awaitility.Awaitility;
-import org.powermock.reflect.Whitebox;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -83,7 +83,7 @@ public class TopicTransactionBufferTest extends TransactionTestBase {
         producer.newMessage(txn).value("test".getBytes()).send();
         PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0)
                 .getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get();
-        Whitebox.setInternalState(persistentTopic.getManagedLedger(), "state", ManagedLedgerImpl.State.WriteFailed);
+        FieldUtils.writeField(persistentTopic.getManagedLedger(), "state", ManagedLedgerImpl.State.WriteFailed, true);
         txn.commit().get();
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
index 4cc29f396cf..0922c9b801b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
@@ -29,7 +29,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -61,7 +60,6 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.awaitility.Awaitility;
-import org.powermock.reflect.Whitebox;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -324,9 +322,11 @@ public class TransactionLowWaterMarkTest extends TransactionTestBase {
         field.setAccessible(true);
         field.set(txn1, TransactionImpl.State.OPEN);
 
-        AtomicLong pendingWriteOps = Whitebox.getInternalState(getPulsarServiceList().get(0)
-                .getBrokerService().getTopic(TopicName.get(TOPIC).toString(),
-                        false).get().get(), "pendingWriteOps");
+        PersistentTopic t = (PersistentTopic) getPulsarServiceList().get(0)
+                .getBrokerService()
+                .getTopic(TopicName.get(TOPIC).toString(), false)
+                .get()
+                .orElseThrow();
         try {
             producer.newMessage(txn1).send();
             fail();
@@ -334,7 +334,7 @@ public class TransactionLowWaterMarkTest extends TransactionTestBase {
             // no-op
         }
 
-        assertEquals(pendingWriteOps.get(), 0);
+        assertEquals(t.getPendingWriteOps().get(), 0);
     }
 
     @Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
index c35d15d96da..58be651a7d1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
@@ -19,6 +19,15 @@
 package org.apache.pulsar.broker.transaction.pendingack;
 
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
@@ -40,23 +49,11 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
-import org.powermock.reflect.Whitebox;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.lang.reflect.Field;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.TimeUnit;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
 @Slf4j
 @Test(groups = "broker")
 public class PendingAckInMemoryDeleteTest extends TransactionTestBase {
@@ -309,12 +306,15 @@ public class PendingAckInMemoryDeleteTest extends TransactionTestBase {
 
         consumer.acknowledgeAsync(consumer.receive().getMessageId(), commitTxn).get();
 
-        PendingAckHandle pendingAckHandle = Whitebox.getInternalState(getPulsarServiceList().get(0)
-                .getBrokerService().getTopic("persistent://" + normalTopic, false).get().get()
-                .getSubscription(subscriptionName), "pendingAckHandle");
-
+        Topic t = getPulsarServiceList().get(0)
+                .getBrokerService()
+                .getTopic("persistent://" + normalTopic, false)
+                .get()
+                .orElseThrow();
+        PersistentSubscription subscription = (PersistentSubscription) t.getSubscription(subscriptionName);
+        PendingAckHandleImpl pendingAckHandle = (PendingAckHandleImpl) subscription.getPendingAckHandle();
         Map<PositionImpl, MutablePair<PositionImpl, Integer>> individualAckPositions =
-                Whitebox.getInternalState(pendingAckHandle, "individualAckPositions");
+                pendingAckHandle.getIndividualAckPositions();
         // one message in pending ack state
         assertEquals(1, individualAckPositions.size());
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 2f572a841b0..669161f67c0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -23,6 +23,7 @@ import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
@@ -75,10 +76,14 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import org.apache.avro.Schema.Parser;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
 import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -104,7 +109,6 @@ import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
-import org.powermock.reflect.Whitebox;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -1032,6 +1036,20 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
 
     }
 
+    @Override
+    protected void beforePulsarStartMocks(PulsarService pulsar) throws Exception {
+        super.beforePulsarStartMocks(pulsar);
+        doAnswer(i0 -> {
+            ManagedLedgerFactory factory = (ManagedLedgerFactory) spy(i0.callRealMethod());
+            doAnswer(i1 -> {
+                EntryCacheManager manager = (EntryCacheManager) spy(i1.callRealMethod());
+                doAnswer(i2 -> spy(i2.callRealMethod())).when(manager).getEntryCache(any());
+                return manager;
+            }).when(factory).getEntryCacheManager();
+            return factory;
+        }).when(pulsar).getManagedLedgerFactory();
+    }
+
     /**
      * Usecase 1: Only 1 Active Subscription - 1 subscriber - Produce Messages - EntryCache should cache messages -
      * EntryCache should be cleaned : Once active subscription consumes messages
@@ -1068,8 +1086,8 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
 
         PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger();
-        EntryCache entryCache = spy((EntryCache) Whitebox.getInternalState(ledger, "entryCache"));
-        Whitebox.setInternalState(ledger, "entryCache", entryCache);
+
+        EntryCache entryCache = (EntryCache) FieldUtils.readField(ledger, "entryCache", true);
 
         Message<byte[]> msg;
         // 2. Produce messages
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index c241b6e6cc7..4f9c9080646 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.api.v1;
 
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
@@ -47,8 +48,12 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -71,7 +76,6 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
-import org.powermock.reflect.Whitebox;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -608,6 +612,20 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
         }
     }
 
+    @Override
+    protected void beforePulsarStartMocks(PulsarService pulsar) throws Exception {
+        super.beforePulsarStartMocks(pulsar);
+        doAnswer(i0 -> {
+            ManagedLedgerFactory factory = (ManagedLedgerFactory) spy(i0.callRealMethod());
+            doAnswer(i1 -> {
+                EntryCacheManager manager = (EntryCacheManager) spy(i1.callRealMethod());
+                doAnswer(i2 -> spy(i2.callRealMethod())).when(manager).getEntryCache(any());
+                return manager;
+            }).when(factory).getEntryCacheManager();
+            return factory;
+        }).when(pulsar).getManagedLedgerFactory();
+    }
+
     /**
      * Usecase 1: Only 1 Active Subscription - 1 subscriber - Produce Messages - EntryCache should cache messages -
      * EntryCache should be cleaned : Once active subscription consumes messages
@@ -648,10 +666,9 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
         PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger();
 
-        EntryCache entryCache = spy((EntryCache) Whitebox.getInternalState(ledger, "entryCache"));
-        Whitebox.setInternalState(ledger, "entryCache", entryCache);
+        EntryCache entryCache = (EntryCache) FieldUtils.readField(ledger, "entryCache", true);
 
-        Message<byte[]>msg = null;
+        Message<byte[]> msg = null;
         // 2. Produce messages
         for (int i = 0; i < 30; i++) {
             String message = "my-message-" + i;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index 25a2539582e..c5d03038039 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -20,16 +20,11 @@ package org.apache.pulsar.client.impl;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
-
-import java.lang.reflect.Field;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -38,7 +33,6 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.powermock.reflect.Whitebox;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
@@ -267,7 +261,7 @@ public class NegativeAcksTest extends ProducerConsumerBase {
     public void testNegativeAcksDeleteFromUnackedTracker() throws Exception {
         String topic = BrokerTestUtil.newUniqueName("testNegativeAcksDeleteFromUnackedTracker");
         @Cleanup
-        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+        ConsumerImpl<String> consumer = (ConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic)
                 .subscriptionName("sub1")
                 .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
@@ -282,18 +276,15 @@ public class NegativeAcksTest extends ProducerConsumerBase {
         BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(3, 1, 0, 1);
         BatchMessageIdImpl batchMessageId3 = new BatchMessageIdImpl(3, 1, 0, 2);
 
-        UnAckedMessageTracker unAckedMessageTracker = ((ConsumerImpl) consumer).getUnAckedMessageTracker();
+        UnAckedMessageTracker unAckedMessageTracker = consumer.getUnAckedMessageTracker();
         unAckedMessageTracker.add(topicMessageId);
 
-        Field fieldNegativeAcksTracker = Whitebox.getField(ConsumerImpl.class, "negativeAcksTracker");
-        NegativeAcksTracker negativeAcksTracker = (NegativeAcksTracker) fieldNegativeAcksTracker.get(((ConsumerImpl) consumer));
-        Field fieldNackedMessages = Whitebox.getField(NegativeAcksTracker.class, "nackedMessages");
         // negative topic message id
         consumer.negativeAcknowledge(topicMessageId);
-        HashMap<MessageId, Long> nackedMessages = (HashMap)fieldNackedMessages.get(negativeAcksTracker);
-        assertEquals(nackedMessages.size(), 1);
+        NegativeAcksTracker negativeAcksTracker = consumer.getNegativeAcksTracker();
+        assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse(-1).intValue(), 1);
         assertEquals(unAckedMessageTracker.size(), 0);
-        nackedMessages.clear();
+        negativeAcksTracker.close();
         // negative batch message id
         unAckedMessageTracker.add(batchMessageId);
         unAckedMessageTracker.add(batchMessageId2);
@@ -301,9 +292,9 @@ public class NegativeAcksTest extends ProducerConsumerBase {
         consumer.negativeAcknowledge(batchMessageId);
         consumer.negativeAcknowledge(batchMessageId2);
         consumer.negativeAcknowledge(batchMessageId3);
-        assertEquals(nackedMessages.size(), 1);
+        assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse(-1).intValue(), 1);
         assertEquals(unAckedMessageTracker.size(), 0);
-        nackedMessages.clear();
+        negativeAcksTracker.close();
     }
 
     @Test(timeOut = 10000)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 5e84a30e867..38251a7ad86 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import static org.apache.pulsar.common.protocol.Commands.hasChecksum;
 import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.Iterables;
 import com.scurrilous.circe.checksum.Crc32cIntChecksum;
@@ -391,6 +392,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         return unAckedMessageTracker;
     }
 
+    @VisibleForTesting
+    NegativeAcksTracker getNegativeAcksTracker() {
+        return negativeAcksTracker;
+    }
+
     @Override
     public CompletableFuture<Void> unsubscribeAsync() {
         if (getState() == State.Closing || getState() == State.Closed) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index 6273f4d582e..86121dd2c34 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -19,11 +19,13 @@
 package org.apache.pulsar.client.impl;
 
 import static org.apache.pulsar.client.impl.UnAckedMessageTracker.addChunkedMessageIdsAndRemoveFromSequenceMap;
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.Timeout;
 import io.netty.util.Timer;
 import java.io.Closeable;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.client.api.Message;
@@ -123,6 +125,11 @@ class NegativeAcksTracker implements Closeable {
         }
     }
 
+    @VisibleForTesting
+    Optional<Integer> getNackedMessagesCount() {
+        return Optional.ofNullable(nackedMessages).map(HashMap::size);
+    }
+
     @Override
     public synchronized void close() {
         if (timeout != null && !timeout.isCancelled()) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index b0e78273dc6..6f0b75fb540 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -994,7 +994,7 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     @VisibleForTesting
-    void setLookup(LookupService lookup) {
+    public void setLookup(LookupService lookup) {
         this.lookup = lookup;
     }
 
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
index cc2a3519565..7f3d5bfd5d5 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
@@ -396,8 +396,6 @@ public class AvroSchemaTest {
         // Because data does not conform to schema expect a crash
         Assert.assertThrows( SchemaSerializationException.class, () -> avroWriter.write(badNasaMissionData));
 
-        // Get the buffered data using powermock
-
         // Assert that the buffer position is reset to zero
         Assert.assertEquals(avroWriter.getEncoder().bytesBuffered(), 0);
     }