You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/07/10 06:17:13 UTC

[pulsar] branch master updated: Upgrade to Mockito 2.x (#4671)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 108780c  Upgrade to Mockito 2.x (#4671)
108780c is described below

commit 108780c79cc9ec71f4ff8ee3a7231766ef545894
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Jul 9 23:17:07 2019 -0700

    Upgrade to Mockito 2.x (#4671)
    
    Upgrading to Mockito 2.28 and PowerMock 2.0. This a pre-step to be able to run CI with Java 11 / 12
---
 buildtools/pom.xml                                 |    9 +-
 distribution/server/src/assemble/LICENSE.bin.txt   |    4 +-
 .../bookkeeper/client/PulsarMockReadHandle.java    |    6 +-
 .../bookkeeper/mledger/impl/EntryCacheTest.java    |   15 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |    6 +-
 .../mledger/impl/OffloadPrefixReadTest.java        |   22 +-
 pom.xml                                            |   21 +-
 .../pulsar/broker/admin/AdminApiOffloadTest.java   |   22 +-
 .../apache/pulsar/broker/admin/NamespacesTest.java |  253 +----
 .../broker/auth/AuthenticationServiceTest.java     |    2 +-
 .../broker/cache/ResourceQuotaCacheTest.java       |    2 +-
 .../delayed/InMemoryDeliveryTrackerTest.java       |    8 +-
 .../broker/namespace/NamespaceServiceTest.java     |    2 +-
 .../broker/namespace/OwnershipCacheTest.java       |    5 +-
 .../pulsar/broker/service/BrokerServiceTest.java   |    6 +-
 .../PersistentDispatcherFailoverConsumerTest.java  |   21 +-
 .../service/PersistentTopicConcurrentTest.java     |    2 +-
 .../pulsar/broker/service/PersistentTopicTest.java |   55 +-
 .../pulsar/broker/service/ReplicatorTest.java      |   12 +-
 .../pulsar/broker/service/ServerCnxTest.java       |   44 +-
 .../persistent/PersistentSubscriptionTest.java     |   62 +-
 ...ReplicatedSubscriptionsSnapshotBuilderTest.java |    4 +-
 ...RegistryServiceWithSchemaDataValidatorTest.java |    6 +-
 .../pulsar/client/api/BrokerServiceLookupTest.java |    2 +-
 .../client/api/SimpleProducerConsumerTest.java     |    2 +-
 .../client/api/v1/V1_ProducerConsumerTest.java     |    2 +-
 .../client/impl/BrokerClientIntegrationTest.java   |   16 +-
 .../pulsar/common/naming/NamespaceBundleTest.java  |    2 +-
 .../pulsar/common/naming/NamespaceBundlesTest.java |    4 +-
 .../apache/pulsar/compaction/CompactionTest.java   |    2 +-
 .../apache/pulsar/io/PulsarFunctionTlsTest.java    |    2 +-
 .../clients/producer/PulsarKafkaProducerTest.java  |   61 +-
 .../KafkaProducerInterceptorWrapperTest.java       |    2 +-
 .../apache/pulsar/admin/cli/CmdFunctionsTest.java  |    6 +-
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |   36 +-
 .../org/apache/pulsar/admin/cli/TestCmdSinks.java  |    4 +-
 .../apache/pulsar/admin/cli/TestCmdSources.java    |    9 +-
 .../org/apache/pulsar/client/impl/MessageImpl.java |    5 +
 .../pulsar/client/api/MessageRouterTest.java       |    2 +-
 .../apache/pulsar/client/impl/ClientCnxTest.java   |    6 +-
 .../apache/pulsar/client/impl/MessageImplTest.java |    2 +-
 .../client/impl/ProducerBuilderImplTest.java       |    6 +-
 .../schema/SupportVersioningAvroSchemaTest.java    |    2 +-
 .../SupportVersioningKeyValueSchemaTest.java       |    2 +-
 .../impl/schema/generic/GenericAvroSchemaTest.java |    4 +-
 .../impl/schema/generic/GenericSchemaImplTest.java |    2 +-
 .../MultiVersionSchemaInfoProviderTest.java        |    9 +-
 .../apache/pulsar/client/util/ObjectCacheTest.java |    3 +-
 .../apache/pulsar/common/naming/TopicNameTest.java |    2 +
 .../pulsar/common/protocol/ByteBufPairTest.java    |    2 +-
 .../pulsar/common/protocol/PulsarDecoderTest.java  |    2 +-
 .../connectors/pulsar/PulsarAvroTableSinkTest.java |   10 +-
 .../pulsar/PulsarConsumerSourceTests.java          |    6 +-
 .../connectors/pulsar/PulsarJsonTableSinkTest.java |   11 +-
 .../pulsar/functions/source/PulsarSource.java      |    2 +-
 .../pulsar/functions/instance/ContextImplTest.java |   35 +-
 .../instance/JavaInstanceRunnableProcessTest.java  | 1012 --------------------
 .../instance/state/StateContextImplTest.java       |    6 +-
 .../pulsar/functions/sink/PulsarSinkTest.java      |   95 +-
 .../pulsar/functions/source/PulsarSourceTest.java  |   39 +-
 .../windowing/WindowFunctionExecutorTest.java      |   22 +-
 .../KubernetesSecretsTokenAuthProviderTest.java    |   20 +-
 .../EnvironmentBasedSecretsProviderTest.java       |    6 +-
 .../apache/pulsar/functions/utils/ActionsTest.java |    2 +-
 .../worker/ClusterServiceCoordinatorTest.java      |    4 +-
 .../functions/worker/FunctionActionerTest.java     |    3 +-
 .../worker/FunctionMetaDataManagerTest.java        |  147 ++-
 .../worker/FunctionMetaDataTopicTailerTest.java    |    4 +-
 .../worker/FunctionRuntimeManagerTest.java         |  108 +--
 .../functions/worker/MembershipManagerTest.java    |   27 +-
 .../functions/worker/SchedulerManagerTest.java     |    8 +-
 .../functions/worker/dlog/DLInputStreamTest.java   |    6 +-
 .../functions/worker/dlog/DLOutputStreamTest.java  |    2 +-
 .../worker/executor/MockExecutorController.java    |   20 +-
 .../worker/request/ServiceRequestManagerTest.java  |    2 +-
 .../worker/rest/api/FunctionsImplTest.java         |    6 +-
 .../rest/api/v2/FunctionApiV2ResourceTest.java     |   57 +-
 .../rest/api/v3/FunctionApiV3ResourceTest.java     |   56 +-
 .../worker/rest/api/v3/SinkApiV3ResourceTest.java  |   16 +-
 .../rest/api/v3/SourceApiV3ResourceTest.java       |   56 +-
 .../pulsar/io/file/FileConsumerThreadTests.java    |    2 +-
 .../pulsar/io/file/FileListingThreadTests.java     |    2 +-
 .../pulsar/io/file/ProcessedFileThreadTests.java   |    2 +-
 .../io/influxdb/InfluxDBGenericRecordSinkTest.java |    6 +-
 .../org/apache/pulsar/io/mongodb/MongoSink.java    |   39 +-
 .../apache/pulsar/io/mongodb/MongoSinkTest.java    |   40 +-
 .../pulsar/log4j2/appender/PulsarAppender.java     |    1 +
 .../pulsar/log4j2/appender/PulsarAppenderTest.java |   29 +-
 pulsar-sql/presto-distribution/LICENSE             |    3 +-
 pulsar-sql/presto-distribution/pom.xml             |    7 +
 .../pulsar/sql/presto/TestPulsarConnector.java     |    6 +-
 .../pulsar/sql/presto/TestPulsarMetadata.java      |    2 +-
 .../pulsar/sql/presto/TestPulsarSplitManager.java  |    6 +-
 .../org/apache/pulsar/storm/PulsarSpoutTest.java   |   31 +-
 tiered-storage/jcloud/pom.xml                      |    5 +
 .../jcloud/BlobStoreBackedInputStreamTest.java     |    8 +-
 .../impl/BlobStoreManagedLedgerOffloaderTest.java  |    6 +-
 .../impl/BlockAwareSegmentInputStreamTest.java     |    6 +-
 98 files changed, 755 insertions(+), 2022 deletions(-)

diff --git a/buildtools/pom.xml b/buildtools/pom.xml
index f895e81..a6f1daf 100644
--- a/buildtools/pom.xml
+++ b/buildtools/pom.xml
@@ -38,7 +38,7 @@
     <dependency>
       <groupId>org.testng</groupId>
       <artifactId>testng</artifactId>
-      <version>6.13.1</version>
+      <version>6.14.3</version>
     </dependency>
     <dependency>
       <groupId>org.apache.logging.log4j</groupId>
@@ -67,6 +67,13 @@
           <header>../src/license-header.txt</header>
         </configuration>
       </plugin>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.8</source>
+          <target>1.8</target>
+        </configuration>
+      </plugin>
     </plugins>
     <extensions>
       <extension>
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index f59e6c9..76c30cb 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -421,7 +421,7 @@ The Apache Software License, Version 2.0
  * Google Error Prone Annotations - com.google.errorprone-error_prone_annotations-2.2.0.jar
  * OkHttp - com.squareup.okhttp-okhttp-2.5.0.jar
  * Okio - com.squareup.okio-okio-1.13.0.jar
- * Javassist -- org.javassist-javassist-3.21.0-GA.jar
+ * Javassist -- org.javassist-javassist-3.25.0-GA.jar
   * gRPC
     - io.grpc-grpc-all-1.18.0.jar
     - io.grpc-grpc-auth-1.18.0.jar
@@ -455,7 +455,7 @@ The Apache Software License, Version 2.0
   * Snappy Java
     - org.xerial.snappy-snappy-java-1.1.1.3.jar
   * Objenesis
-    - org.objenesis-objenesis-2.1.jar
+    - org.objenesis-objenesis-2.6.jar
   * Squareup
     - com.squareup.okhttp-logging-interceptor-2.7.5.jar
     - com.squareup.okhttp-okhttp-ws-2.7.5.jar
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java
index 30bcf46..7458573 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java
@@ -80,7 +80,11 @@ class PulsarMockReadHandle implements ReadHandle {
 
     @Override
     public long getLastAddConfirmed() {
-        return entries.get(entries.size() - 1).getEntryId();
+        if (entries.isEmpty()) {
+            return -1;
+        } else {
+            return entries.get(entries.size() - 1).getEntryId();
+        }
     }
 
     @Override
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
index c48ea24..aa2730c 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
@@ -18,19 +18,24 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.*;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 
 import io.netty.buffer.Unpooled;
+
 import java.lang.reflect.Method;
 import java.util.List;
 import java.util.Vector;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+
 import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
-import org.apache.bookkeeper.client.api.BKException;
 import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.api.ReadHandle;
@@ -39,8 +44,6 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 72d19ce..4f27611 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -18,8 +18,8 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -95,9 +95,9 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.Ledge
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
-import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException.Code;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
index 08e88ee..038cbe4 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
@@ -18,10 +18,10 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyMap;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.eq;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -32,7 +32,6 @@ import com.google.common.collect.Lists;
 import io.netty.buffer.ByteBuf;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -49,23 +48,16 @@ import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
-
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
-    private static final Logger log = LoggerFactory.getLogger(OffloadPrefixReadTest.class);
-
     @Test
     public void testOffloadRead() throws Exception {
         MockLedgerOffloader offloader = spy(new MockLedgerOffloader());
@@ -101,21 +93,21 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
             Assert.assertEquals(new String(e.getData()), "entry-" + i++);
         }
         verify(offloader, times(1))
-            .readOffloaded(anyLong(), anyObject(), anyMap());
+            .readOffloaded(anyLong(), any(), anyMap());
         verify(offloader).readOffloaded(anyLong(), eq(firstLedgerUUID), anyMap());
 
         for (Entry e : cursor.readEntries(10)) {
             Assert.assertEquals(new String(e.getData()), "entry-" + i++);
         }
         verify(offloader, times(2))
-            .readOffloaded(anyLong(), anyObject(), anyMap());
+            .readOffloaded(anyLong(), any(), anyMap());
         verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID), anyMap());
 
         for (Entry e : cursor.readEntries(5)) {
             Assert.assertEquals(new String(e.getData()), "entry-" + i++);
         }
         verify(offloader, times(2))
-            .readOffloaded(anyLong(), anyObject(), anyMap());
+            .readOffloaded(anyLong(), any(), anyMap());
     }
 
     static class MockLedgerOffloader implements LedgerOffloader {
diff --git a/pom.xml b/pom.xml
index 8c64e95..ef19d94 100644
--- a/pom.xml
+++ b/pom.xml
@@ -205,6 +205,10 @@ flexible messaging model and an intuitive client API.</description>
     <disruptor.version>3.4.0</disruptor.version>
     <testcontainers.version>1.11.2</testcontainers.version>
     <kerby.version>1.1.1</kerby.version>
+    <testng.version>6.14.3</testng.version>
+    <mockito.version>2.28.2</mockito.version>
+    <powermock.version>2.0.2</powermock.version>
+    <javassist.version>3.25.0-GA</javassist.version>
 
     <!-- Plugin dependencies -->
     <protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
@@ -239,25 +243,25 @@ flexible messaging model and an intuitive client API.</description>
       <dependency>
         <groupId>org.testng</groupId>
         <artifactId>testng</artifactId>
-        <version>6.14.3</version>
+        <version>${testng.version}</version>
       </dependency>
 
       <dependency>
         <groupId>org.mockito</groupId>
         <artifactId>mockito-core</artifactId>
-        <version>1.10.19</version>
+        <version>${mockito.version}</version>
       </dependency>
 
       <dependency>
         <groupId>org.powermock</groupId>
-        <artifactId>powermock-api-mockito</artifactId>
-        <version>1.7.3</version>
+        <artifactId>powermock-api-mockito2</artifactId>
+        <version>${powermock.version}</version>
       </dependency>
 
       <dependency>
         <groupId>org.powermock</groupId>
         <artifactId>powermock-module-testng</artifactId>
-        <version>1.7.3</version>
+        <version>${powermock.version}</version>
       </dependency>
 
       <dependency>
@@ -1003,6 +1007,11 @@ flexible messaging model and an intuitive client API.</description>
         <groupId>joda-time</groupId>
         <artifactId>joda-time</artifactId>
         <version>${joda.version}</version>
+      </dependency>      
+      <dependency>
+        <groupId>org.javassist</groupId>
+        <artifactId>javassist</artifactId>
+        <version>${javassist.version}</version>
       </dependency>
     </dependencies>
   </dependencyManagement>
@@ -1030,7 +1039,7 @@ flexible messaging model and an intuitive client API.</description>
 
     <dependency>
       <groupId>org.powermock</groupId>
-      <artifactId>powermock-api-mockito</artifactId>
+      <artifactId>powermock-api-mockito2</artifactId>
       <scope>test</scope>
     </dependency>
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index e4b889e..72591c6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.broker.admin;
 
-import static org.mockito.Matchers.anyObject;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -29,33 +29,23 @@ import com.google.common.collect.Sets;
 
 import java.util.concurrent.CompletableFuture;
 
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-@Slf4j
 public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
 
-    private static final Logger LOG = LoggerFactory.getLogger(AdminApiOffloadTest.class);
-
     @BeforeMethod
     @Override
     public void setup() throws Exception {
@@ -84,10 +74,10 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
         doReturn(offloader).when(pulsar).getManagedLedgerOffloader();
 
         CompletableFuture<Void> promise = new CompletableFuture<>();
-        doReturn(promise).when(offloader).offload(anyObject(), anyObject(), anyObject());
+        doReturn(promise).when(offloader).offload(any(), any(), any());
 
         MessageId currentId = MessageId.latest;
-        try (Producer p = pulsarClient.newProducer().topic(topicName).enableBatching(false).create()) {
+        try (Producer<byte[]> p = pulsarClient.newProducer().topic(topicName).enableBatching(false).create()) {
             for (int i = 0; i < 15; i++) {
                 currentId = p.send("Foobar".getBytes());
             }
@@ -120,7 +110,7 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
 
         // Try again
         doReturn(CompletableFuture.completedFuture(null))
-            .when(offloader).offload(anyObject(), anyObject(), anyObject());
+            .when(offloader).offload(any(), any(), any());
 
         admin.topics().triggerOffload(topicName, currentId);
 
@@ -131,7 +121,7 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
         Assert.assertEquals(firstUnoffloaded.getLedgerId(), info.ledgers.get(1).ledgerId);
         Assert.assertEquals(firstUnoffloaded.getEntryId(), 0);
 
-        verify(offloader, times(2)).offload(anyObject(), anyObject(), anyObject());
+        verify(offloader, times(2)).offload(any(), any(), any());
     }
 
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index c615a09..138404e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.broker.admin;
 
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
@@ -34,6 +34,9 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 import java.lang.reflect.Field;
 import java.net.URI;
 import java.net.URL;
@@ -69,26 +72,20 @@ import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.ZooDefs;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
+import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 @Test
 public class NamespacesTest extends MockedPulsarServiceBaseTest {
 
@@ -560,26 +557,11 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
 
         // setup to redirect to another broker in the same cluster
         doReturn(Optional.of(new URL("http://otherhost" + ":" + BROKER_WEBSERVICE_PORT))).when(nsSvc)
-                .getWebServiceUrl(Mockito.argThat(new Matcher<NamespaceName>() {
-
+                .getWebServiceUrl(Mockito.argThat(new ArgumentMatcher<NamespaceName>() {
                     @Override
-                    public void describeTo(Description description) {
-                        // TODO Auto-generated method stub
-
-                    }
-
-                    @Override
-                    public boolean matches(Object item) {
-                        NamespaceName nsname = (NamespaceName) item;
+                    public boolean matches(NamespaceName nsname) {
                         return nsname.equals(NamespacesTest.this.testGlobalNamespaces.get(0));
                     }
-
-                    @Override
-                    public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {
-                        // TODO Auto-generated method stub
-
-                    }
-
                 }), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean());
 
         admin.namespaces().setNamespaceReplicationClusters(testGlobalNamespaces.get(0).toString(),
@@ -631,7 +613,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         // delete the topic from ZK
         mockZookKeeper.delete("/managed-ledgers/" + topicName.getPersistenceNamingEncoding(), -1);
 
-        ZkUtils.createFullPathOptimistic(mockZookKeeper, "/admin/partitioned-topics/" + topicName.getPersistenceNamingEncoding(),
+        ZkUtils.createFullPathOptimistic(mockZookKeeper,
+                "/admin/partitioned-topics/" + topicName.getPersistenceNamingEncoding(),
                 new byte[0], null, null);
         try {
             namespaces.deleteNamespace(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false);
@@ -675,67 +658,27 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         createBundledTestNamespaces(this.testTenant, this.testLocalCluster, bundledNsLocal, bundleData);
         final NamespaceName testNs = NamespaceName.get(this.testTenant, this.testLocalCluster, bundledNsLocal);
 
-        org.apache.pulsar.client.admin.Namespaces namespacesAdmin = mock(org.apache.pulsar.client.admin.Namespaces.class);
+        org.apache.pulsar.client.admin.Namespaces namespacesAdmin = mock(
+                org.apache.pulsar.client.admin.Namespaces.class);
         doReturn(namespacesAdmin).when(admin).namespaces();
 
-        doReturn(null).when(nsSvc).getWebServiceUrl(Mockito.argThat(new Matcher<NamespaceBundle>() {
-
-            @Override
-            public void describeTo(Description description) {
-            }
-
-            @Override
-            public boolean matches(Object item) {
-                if (item instanceof NamespaceBundle) {
-                    NamespaceBundle bundle = (NamespaceBundle) item;
-                    return bundle.getNamespaceObject().equals(testNs);
-                }
-                return false;
-            }
-
+        doReturn(null).when(nsSvc).getWebServiceUrl(Mockito.argThat(new ArgumentMatcher<NamespaceBundle>() {
             @Override
-            public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {
+            public boolean matches(NamespaceBundle bundle) {
+                return bundle.getNamespaceObject().equals(testNs);
             }
-
         }), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean());
-        doReturn(false).when(nsSvc).isServiceUnitOwned(Mockito.argThat(new Matcher<NamespaceBundle>() {
-
-            @Override
-            public void describeTo(Description description) {
-            }
-
+        doReturn(false).when(nsSvc).isServiceUnitOwned(Mockito.argThat(new ArgumentMatcher<NamespaceBundle>() {
             @Override
-            public boolean matches(Object item) {
-                if (item instanceof NamespaceBundle) {
-                    NamespaceBundle bundle = (NamespaceBundle) item;
-                    return bundle.getNamespaceObject().equals(testNs);
-                }
-                return false;
+            public boolean matches(NamespaceBundle bundle) {
+                return bundle.getNamespaceObject().equals(testNs);
             }
-
-            @Override
-            public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {
-            }
-
         }));
         doReturn(Optional.of(new NamespaceEphemeralData())).when(nsSvc)
-                .getOwner(Mockito.argThat(new Matcher<NamespaceBundle>() {
-
+                .getOwner(Mockito.argThat(new ArgumentMatcher<NamespaceBundle>() {
                     @Override
-                    public void describeTo(Description description) {
-                    }
-
-                    @Override
-                    public boolean matches(Object item) {
-                        if (item instanceof NamespaceBundle) {
-                            NamespaceBundle bundle = (NamespaceBundle) item;
-                            return bundle.getNamespaceObject().equals(testNs);
-                        }
-                        return false;
-                    }
-
-                    @Override
-                    public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {
+                    public boolean matches(NamespaceBundle bundle) {
+                        return bundle.getNamespaceObject().equals(testNs);
                     }
                 }));
 
@@ -760,7 +703,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
 
         NamespaceBundles nsBundles = nsSvc.getNamespaceBundleFactory().getBundles(testNs, bundleData);
         // make one bundle owned
-        doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(nsBundles.getBundles().get(0), false, true, false);
+        doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(nsBundles.getBundles().get(0), false,
+                true, false);
         doReturn(true).when(nsSvc).isServiceUnitOwned(nsBundles.getBundles().get(0));
         doNothing().when(namespacesAdmin).deleteNamespaceBundle(
                 testTenant + "/" + testLocalCluster + "/" + bundledNsLocal, "0x00000000_0x80000000");
@@ -793,54 +737,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         final NamespaceName testNs = this.testLocalNamespaces.get(1);
         URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
         doReturn(Optional.of(localWebServiceUrl)).when(nsSvc)
-                .getWebServiceUrl(Mockito.argThat(new Matcher<NamespaceBundle>() {
-
-                    @Override
-                    public void describeTo(Description description) {
-                        // TODO Auto-generated method stub
-
-                    }
-
-                    @Override
-                    public boolean matches(Object item) {
-                        if (item instanceof NamespaceName) {
-                            NamespaceName ns = (NamespaceName) item;
-                            return ns.equals(testNs);
-                        }
-                        return false;
-                    }
-
-                    @Override
-                    public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {
-                        // TODO Auto-generated method stub
-
-                    }
-
-                }), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean());
-        doReturn(true).when(nsSvc).isServiceUnitOwned(Mockito.argThat(new Matcher<NamespaceBundle>() {
-
-            @Override
-            public void describeTo(Description description) {
-                // TODO Auto-generated method stub
-
-            }
-
-            @Override
-            public boolean matches(Object item) {
-                if (item instanceof NamespaceName) {
-                    NamespaceName ns = (NamespaceName) item;
-                    return ns.equals(testNs);
-                }
-                return false;
-            }
-
-            @Override
-            public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {
-                // TODO Auto-generated method stub
-
-            }
-
-        }));
+                .getWebServiceUrl(Mockito.argThat(ns -> ns.equals(testNs)), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean());
+        doReturn(true).when(nsSvc).isServiceUnitOwned(Mockito.argThat(ns -> ns.equals(testNs)));
 
         NamespaceBundle bundle = nsSvc.getNamespaceBundleFactory().getFullBundle(testNs);
         doNothing().when(namespaces).validateBundleOwnership(bundle, false, true);
@@ -914,54 +812,10 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         final NamespaceName testNs = NamespaceName.get(this.testTenant, this.testLocalCluster, bundledNsLocal);
 
         doReturn(Optional.of(localWebServiceUrl)).when(nsSvc)
-                .getWebServiceUrl(Mockito.argThat(new Matcher<NamespaceBundle>() {
-
-                    @Override
-                    public void describeTo(Description description) {
-                        // TODO Auto-generated method stub
-
-                    }
-
-                    @Override
-                    public boolean matches(Object item) {
-                        if (item instanceof NamespaceBundle) {
-                            NamespaceBundle bundle = (NamespaceBundle) item;
-                            return bundle.getNamespaceObject().equals(testNs);
-                        }
-                        return false;
-                    }
-
-                    @Override
-                    public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {
-                        // TODO Auto-generated method stub
-
-                    }
-
-                }), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean());
-        doReturn(true).when(nsSvc).isServiceUnitOwned(Mockito.argThat(new Matcher<NamespaceBundle>() {
-
-            @Override
-            public void describeTo(Description description) {
-                // TODO Auto-generated method stub
-
-            }
-
-            @Override
-            public boolean matches(Object item) {
-                if (item instanceof NamespaceBundle) {
-                    NamespaceBundle bundle = (NamespaceBundle) item;
-                    return bundle.getNamespaceObject().equals(testNs);
-                }
-                return false;
-            }
-
-            @Override
-            public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {
-                // TODO Auto-generated method stub
-
-            }
-
-        }));
+                .getWebServiceUrl(Mockito.argThat(bundle -> bundle.getNamespaceObject().equals(testNs)),
+                        Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean());
+        doReturn(true).when(nsSvc)
+                .isServiceUnitOwned(Mockito.argThat(bundle -> bundle.getNamespaceObject().equals(testNs)));
 
         NamespaceBundles nsBundles = nsSvc.getNamespaceBundleFactory().getBundles(testNs, bundleData);
         NamespaceBundle testBundle = nsBundles.getBundles().get(0);
@@ -1119,7 +973,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         doReturn("persistent").when(topics).domain();
 
         topics.validateTopicName(topicName.getTenant(), topicName.getCluster(),
-                                 topicName.getNamespacePortion(), topicName.getEncodedLocalName());
+                topicName.getNamespacePortion(), topicName.getEncodedLocalName());
         topics.validateAdminOperationOnTopic(false);
     }
 
@@ -1163,7 +1017,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
     public void testSubscribeRate() throws Exception {
         SubscribeRate subscribeRate = new SubscribeRate(1, 5);
         String namespace = "my-tenants/my-namespace";
-        admin.tenants().createTenant("my-tenants", new TenantInfo(Sets.newHashSet(), Sets.newHashSet(testLocalCluster)));
+        admin.tenants().createTenant("my-tenants",
+                new TenantInfo(Sets.newHashSet(), Sets.newHashSet(testLocalCluster)));
         admin.namespaces().createNamespace(namespace, Sets.newHashSet(testLocalCluster));
         admin.namespaces().setSubscribeRate(namespace, subscribeRate);
         assertEquals(subscribeRate, admin.namespaces().getSubscribeRate(namespace));
@@ -1171,7 +1026,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
 
         admin.topics().createPartitionedTopic(topicName, 2);
         pulsar.getConfiguration().setAuthorizationEnabled(false);
-        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+        Consumer<?> consumer = pulsarClient.newConsumer()
                 .topic(topicName)
                 .subscriptionType(SubscriptionType.Shared)
                 .subscriptionName("subscribe-rate")
@@ -1202,43 +1057,9 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
 
     private void mockWebUrl(URL localWebServiceUrl, NamespaceName namespace) throws Exception {
         doReturn(Optional.of(localWebServiceUrl)).when(nsSvc)
-                .getWebServiceUrl(Mockito.argThat(new Matcher<NamespaceBundle>() {
-                    @Override
-                    public void describeTo(Description description) {
-                    }
-
-                    @Override
-                    public boolean matches(Object item) {
-                        if (item instanceof NamespaceBundle) {
-                            NamespaceBundle bundle = (NamespaceBundle) item;
-                            return bundle.getNamespaceObject().equals(namespace);
-                        }
-                        return false;
-                    }
-
-                    @Override
-                    public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {
-                    }
-                }), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean());
-        doReturn(true).when(nsSvc).isServiceUnitOwned(Mockito.argThat(new Matcher<NamespaceBundle>() {
-            @Override
-            public void describeTo(Description description) {
-            }
-
-            @Override
-            public boolean matches(Object item) {
-                if (item instanceof NamespaceBundle) {
-                    NamespaceBundle bundle = (NamespaceBundle) item;
-                    return bundle.getNamespaceObject().equals(namespace);
-                }
-                return false;
-            }
-
-            @Override
-            public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {
-            }
-        }));
+                .getWebServiceUrl(Mockito.argThat(bundle -> bundle.getNamespaceObject().equals(namespace)),
+                        Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean());
+        doReturn(true).when(nsSvc)
+                .isServiceUnitOwned(Mockito.argThat(bundle -> bundle.getNamespaceObject().equals(namespace)));
     }
-
-    private static final Logger log = LoggerFactory.getLogger(NamespacesTest.class);
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthenticationServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthenticationServiceTest.java
index 6600566..f75fa54 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthenticationServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthenticationServiceTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.broker.auth;
 
-import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
index d07fae3..6fb4286 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.broker.cache;
 
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index fbfc4d2..6f2cdf0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.broker.delayed;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.*;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
@@ -112,9 +110,9 @@ public class InMemoryDeliveryTrackerTest {
         NavigableMap<Long, TimerTask> tasks = new TreeMap<>();
 
         when(timer.newTimeout(any(), anyLong(), any())).then(invocation -> {
-            TimerTask task = invocation.getArgumentAt(0, TimerTask.class);
-            long timeout = invocation.getArgumentAt(1, Long.class);
-            TimeUnit unit = invocation.getArgumentAt(2, TimeUnit.class);
+            TimerTask task = invocation.getArgument(0, TimerTask.class);
+            long timeout = invocation.getArgument(1, Long.class);
+            TimeUnit unit = invocation.getArgument(2, TimeUnit.class);
             long scheduleAt = clockTime.get() + unit.toMillis(timeout);
             tasks.put(scheduleAt, task);
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 84f54a7..6d9d398 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.broker.namespace;
 
 import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
 import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
index d616e21..172f56d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
@@ -20,8 +20,7 @@ package org.apache.pulsar.broker.namespace;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.pulsar.broker.PulsarService.webAddress;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -87,7 +86,7 @@ public class OwnershipCacheTest {
         bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
         nsService = mock(NamespaceService.class);
         brokerService = mock(BrokerService.class);
-        doReturn(CompletableFuture.completedFuture(1)).when(brokerService).unloadServiceUnit(anyObject());
+        doReturn(CompletableFuture.completedFuture(1)).when(brokerService).unloadServiceUnit(any());
 
         doReturn(zkCache).when(pulsar).getLocalZkCache();
         doReturn(localCache).when(pulsar).getLocalZkCacheService();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 79d6c79..d0d5c28 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.broker.service;
 
 import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
 import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
-import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
@@ -787,7 +787,7 @@ public class BrokerServiceTest extends BrokerTestBase {
         // create topic will fail to get managedLedgerConfig
         CompletableFuture<ManagedLedgerConfig> failedManagedLedgerConfig = new CompletableFuture<>();
         failedManagedLedgerConfig.completeExceptionally(new NullPointerException("failed to peristent policy"));
-        doReturn(failedManagedLedgerConfig).when(service).getManagedLedgerConfig(anyObject());
+        doReturn(failedManagedLedgerConfig).when(service).getManagedLedgerConfig(any());
 
         CompletableFuture<Void> topicCreation = new CompletableFuture<Void>();
 
@@ -830,7 +830,7 @@ public class BrokerServiceTest extends BrokerTestBase {
         // create topic will fail to get managedLedgerConfig
         CompletableFuture<ManagedLedgerConfig> failedManagedLedgerConfig = new CompletableFuture<>();
         failedManagedLedgerConfig.complete(new ManagedLedgerConfig());
-        doReturn(failedManagedLedgerConfig).when(service).getManagedLedgerConfig(anyObject());
+        doReturn(failedManagedLedgerConfig).when(service).getManagedLedgerConfig(any());
 
         CompletableFuture<Void> topicCreation = new CompletableFuture<Void>();
         // fail managed-ledger future
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index f3d1c970..ab96fd5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -20,10 +20,9 @@ package org.apache.pulsar.broker.service;
 
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.matches;
-import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.matches;
+import static org.mockito.Mockito.same;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -137,7 +136,7 @@ public class PersistentDispatcherFailoverConsumerTest {
         consumerChanges = new LinkedBlockingQueue<>();
         this.channelCtx = mock(ChannelHandlerContext.class);
         doAnswer(invocationOnMock -> {
-            ByteBuf buf = invocationOnMock.getArgumentAt(0, ByteBuf.class);
+            ByteBuf buf = invocationOnMock.getArgument(0);
 
             ByteBuf cmdBuf = buf.retainedSlice(4, buf.writerIndex() - 4);
             try {
@@ -203,7 +202,7 @@ public class PersistentDispatcherFailoverConsumerTest {
                 return null;
             }
         }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
-                any(OpenLedgerCallback.class), anyObject());
+                any(OpenLedgerCallback.class), any());
 
         // call openLedgerFailed on ML factory asyncOpen
         doAnswer(new Answer<Object>() {
@@ -214,7 +213,7 @@ public class PersistentDispatcherFailoverConsumerTest {
                 return null;
             }
         }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
-                any(OpenLedgerCallback.class), anyObject());
+                any(OpenLedgerCallback.class), any());
 
         // call addComplete on ledger asyncAddEntry
         doAnswer(new Answer<Object>() {
@@ -223,7 +222,7 @@ public class PersistentDispatcherFailoverConsumerTest {
                 ((AddEntryCallback) invocationOnMock.getArguments()[1]).addComplete(new PositionImpl(1, 1), null);
                 return null;
             }
-        }).when(ledgerMock).asyncAddEntry(any(byte[].class), any(AddEntryCallback.class), anyObject());
+        }).when(ledgerMock).asyncAddEntry(any(byte[].class), any(AddEntryCallback.class), any());
 
         // call openCursorComplete on cursor asyncOpen
         doAnswer(new Answer<Object>() {
@@ -232,7 +231,7 @@ public class PersistentDispatcherFailoverConsumerTest {
                 ((OpenCursorCallback) invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null);
                 return null;
             }
-        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), anyObject());
+        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), any());
 
         // call deleteLedgerComplete on ledger asyncDelete
         doAnswer(new Answer<Object>() {
@@ -241,7 +240,7 @@ public class PersistentDispatcherFailoverConsumerTest {
                 ((DeleteLedgerCallback) invocationOnMock.getArguments()[0]).deleteLedgerComplete(null);
                 return null;
             }
-        }).when(ledgerMock).asyncDelete(any(DeleteLedgerCallback.class), anyObject());
+        }).when(ledgerMock).asyncDelete(any(DeleteLedgerCallback.class), any());
 
         doAnswer(new Answer<Object>() {
             @Override
@@ -249,7 +248,7 @@ public class PersistentDispatcherFailoverConsumerTest {
                 ((DeleteCursorCallback) invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
                 return null;
             }
-        }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), anyObject());
+        }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), any());
     }
 
     private void verifyActiveConsumerChange(CommandActiveConsumerChange change,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index 01895a8..d50a5b2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.broker.service;
 
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 325a735..5d7bbaf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -21,11 +21,10 @@ package org.apache.pulsar.broker.service;
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.matches;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.matches;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
@@ -165,7 +164,7 @@ public class PersistentTopicTest {
         ZooKeeperDataCache<Policies> zkDataCache = mock(ZooKeeperDataCache.class);
         doReturn(zkDataCache).when(configCacheService).policiesCache();
         doReturn(configCacheService).when(pulsar).getConfigurationCache();
-        doReturn(Optional.empty()).when(zkDataCache).get(anyString());
+        doReturn(Optional.empty()).when(zkDataCache).get(any());
 
         LocalZooKeeperCacheService zkCache = mock(LocalZooKeeperCacheService.class);
         doReturn(CompletableFuture.completedFuture(Optional.empty())).when(zkDataCache).getAsync(any());
@@ -183,8 +182,8 @@ public class PersistentTopicTest {
 
         NamespaceService nsSvc = mock(NamespaceService.class);
         doReturn(nsSvc).when(pulsar).getNamespaceService();
-        doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
-        doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
+        doReturn(true).when(nsSvc).isServiceUnitOwned(any());
+        doReturn(true).when(nsSvc).isServiceUnitActive(any());
 
         setupMLAsyncCallbackMocks();
     }
@@ -214,7 +213,7 @@ public class PersistentTopicTest {
                 return null;
             }
         }).when(mlFactoryMock).asyncOpen(anyString(), any(ManagedLedgerConfig.class), any(OpenLedgerCallback.class),
-                anyObject());
+                any());
 
         CompletableFuture<Void> future = brokerService.getOrCreateTopic(topicName).thenAccept(topic -> {
             assertTrue(topic.toString().contains(topicName));
@@ -245,7 +244,7 @@ public class PersistentTopicTest {
                 return null;
             }
         }).when(mlFactoryMock).asyncOpen(anyString(), any(ManagedLedgerConfig.class), any(OpenLedgerCallback.class),
-                anyObject());
+                any());
 
         CompletableFuture<Topic> future = brokerService.getOrCreateTopic(jinxedTopicName);
 
@@ -327,7 +326,7 @@ public class PersistentTopicTest {
                         new ManagedLedgerException("Managed ledger failure"), invocationOnMock.getArguments()[2]);
                 return null;
             }
-        }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), anyObject());
+        }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), any());
 
         topic.publishMessage(payload, (exception, ledgerId, entryId) -> {
             if (exception == null) {
@@ -705,7 +704,7 @@ public class PersistentTopicTest {
                 Thread.sleep(1000);
                 return null;
             }
-        }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), anyObject());
+        }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), any());
 
         ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -882,7 +881,7 @@ public class PersistentTopicTest {
                 ((DeleteLedgerCallback) invocationOnMock.getArguments()[0]).deleteLedgerComplete(null);
                 return null;
             }
-        }).when(ledgerMock).asyncDelete(any(DeleteLedgerCallback.class), anyObject());
+        }).when(ledgerMock).asyncDelete(any(DeleteLedgerCallback.class), any());
 
         ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -960,7 +959,7 @@ public class PersistentTopicTest {
                 return null;
             }
         }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
-                any(OpenLedgerCallback.class), anyObject());
+                any(OpenLedgerCallback.class), any());
 
         // call openLedgerFailed on ML factory asyncOpen
         doAnswer(new Answer<Object>() {
@@ -971,7 +970,7 @@ public class PersistentTopicTest {
                 return null;
             }
         }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
-                any(OpenLedgerCallback.class), anyObject());
+                any(OpenLedgerCallback.class), any());
 
         // call addComplete on ledger asyncAddEntry
         doAnswer(new Answer<Object>() {
@@ -981,7 +980,7 @@ public class PersistentTopicTest {
                         invocationOnMock.getArguments()[2]);
                 return null;
             }
-        }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), anyObject());
+        }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), any());
 
         // call openCursorComplete on cursor asyncOpen
         doAnswer(new Answer<Object>() {
@@ -990,7 +989,7 @@ public class PersistentTopicTest {
                 ((OpenCursorCallback) invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null);
                 return null;
             }
-        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), anyObject());
+        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), any());
 
         doAnswer(new Answer<Object>() {
             @Override
@@ -999,7 +998,7 @@ public class PersistentTopicTest {
                 return null;
             }
         }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class),
-                any(OpenCursorCallback.class), anyObject());
+                any(OpenCursorCallback.class), any());
 
         // call deleteLedgerComplete on ledger asyncDelete
         doAnswer(new Answer<Object>() {
@@ -1008,7 +1007,7 @@ public class PersistentTopicTest {
                 ((DeleteLedgerCallback) invocationOnMock.getArguments()[0]).deleteLedgerComplete(null);
                 return null;
             }
-        }).when(ledgerMock).asyncDelete(any(DeleteLedgerCallback.class), anyObject());
+        }).when(ledgerMock).asyncDelete(any(DeleteLedgerCallback.class), any());
 
         doAnswer(new Answer<Object>() {
             @Override
@@ -1016,13 +1015,13 @@ public class PersistentTopicTest {
                 ((DeleteCursorCallback) invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
                 return null;
             }
-        }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), anyObject());
+        }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), any());
 
         doAnswer((invokactionOnMock) -> {
                 ((MarkDeleteCallback) invokactionOnMock.getArguments()[2])
                     .markDeleteComplete(invokactionOnMock.getArguments()[3]);
                 return null;
-            }).when(cursorMock).asyncMarkDelete(anyObject(), anyObject(), any(MarkDeleteCallback.class), anyObject());
+            }).when(cursorMock).asyncMarkDelete(any(), any(), any(MarkDeleteCallback.class), any());
     }
 
     @Test
@@ -1185,7 +1184,7 @@ public class PersistentTopicTest {
         String localCluster = "local";
         String remoteCluster = "remote";
         final ManagedLedger ledgerMock = mock(ManagedLedger.class);
-        doNothing().when(ledgerMock).asyncDeleteCursor(anyObject(), anyObject(), anyObject());
+        doNothing().when(ledgerMock).asyncDeleteCursor(any(), any(), any());
         doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
 
         PersistentTopic topic = new PersistentTopic(globalTopicName, ledgerMock, brokerService);
@@ -1219,7 +1218,7 @@ public class PersistentTopicTest {
 
         // step-3 : complete the callback to remove replicator from the list
         ArgumentCaptor<DeleteCursorCallback> captor = ArgumentCaptor.forClass(DeleteCursorCallback.class);
-        Mockito.verify(ledgerMock).asyncDeleteCursor(anyObject(), captor.capture(), anyObject());
+        Mockito.verify(ledgerMock).asyncDeleteCursor(any(), captor.capture(), any());
         DeleteCursorCallback callback = captor.getValue();
         callback.deleteCursorComplete(null);
     }
@@ -1231,7 +1230,7 @@ public class PersistentTopicTest {
         String localCluster = "local";
         String remoteCluster = "remote";
         final ManagedLedger ledgerMock = mock(ManagedLedger.class);
-        doNothing().when(ledgerMock).asyncDeleteCursor(anyObject(), anyObject(), anyObject());
+        doNothing().when(ledgerMock).asyncDeleteCursor(any(), any(), any());
         doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
 
         PersistentTopic topic = new PersistentTopic(globalTopicName, ledgerMock, brokerService);
@@ -1252,7 +1251,7 @@ public class PersistentTopicTest {
         verify(clientImpl)
             .createProducerAsync(
                 any(ProducerConfigurationData.class),
-                any(Schema.class), eq(null)
+                any(), eq(null)
             );
 
         replicator.disconnect(false);
@@ -1260,11 +1259,7 @@ public class PersistentTopicTest {
 
         replicator.startProducer();
 
-        verify(clientImpl, Mockito.times(2))
-            .createProducerAsync(
-                any(ProducerConfigurationData.class),
-                any(Schema.class), any(null)
-            );
+        verify(clientImpl, Mockito.times(2)).createProducerAsync(any(), any(), any());
     }
 
     @Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 4a65b93..6f7731b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -18,13 +18,18 @@
  */
 package org.apache.pulsar.broker.service;
 
-import static org.mockito.Matchers.eq;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.google.common.collect.Sets;
+import com.scurrilous.circe.checksum.Crc32cIntChecksum;
+
+import io.netty.buffer.ByteBuf;
+
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.List;
@@ -75,11 +80,6 @@ import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 import org.testng.collections.Lists;
 
-import com.google.common.collect.Sets;
-import com.scurrilous.circe.checksum.Crc32cIntChecksum;
-
-import io.netty.buffer.ByteBuf;
-
 /**
  * Starts 3 brokers that are in 3 different clusters
  */
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 217b6f1..07b619c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -21,9 +21,8 @@ package org.apache.pulsar.broker.service;
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.matches;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.matches;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
@@ -100,7 +99,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
-import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -168,7 +166,7 @@ public class ServerCnxTest {
 
         configCacheService = mock(ConfigurationCacheService.class);
         ZooKeeperDataCache<Policies> zkDataCache = mock(ZooKeeperDataCache.class);
-        doReturn(Optional.empty()).when(zkDataCache).get(anyObject());
+        doReturn(Optional.empty()).when(zkDataCache).get(any());
         doReturn(zkDataCache).when(configCacheService).policiesCache();
         doReturn(configCacheService).when(pulsar).getConfigurationCache();
 
@@ -184,8 +182,8 @@ public class ServerCnxTest {
 
         namespaceService = mock(NamespaceService.class);
         doReturn(namespaceService).when(pulsar).getNamespaceService();
-        doReturn(true).when(namespaceService).isServiceUnitOwned(any(NamespaceBundle.class));
-        doReturn(true).when(namespaceService).isServiceUnitActive(any(TopicName.class));
+        doReturn(true).when(namespaceService).isServiceUnitOwned(any());
+        doReturn(true).when(namespaceService).isServiceUnitActive(any());
 
         setupMLAsyncCallbackMocks();
 
@@ -756,7 +754,7 @@ public class ServerCnxTest {
             });
             return null;
         }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
-                any(OpenLedgerCallback.class), anyObject());
+                any(OpenLedgerCallback.class), any());
 
         // In a create producer timeout from client side we expect to see this sequence of commands :
         // 1. create producer
@@ -815,7 +813,7 @@ public class ServerCnxTest {
                 return null;
             }
         }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
-                any(OpenLedgerCallback.class), anyObject());
+                any(OpenLedgerCallback.class), any());
 
         // In a create producer timeout from client side we expect to see this sequence of commands :
         // 1. create producer
@@ -893,7 +891,7 @@ public class ServerCnxTest {
             });
             return null;
         }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
-                any(OpenLedgerCallback.class), anyObject());
+                any(OpenLedgerCallback.class), any());
 
         // In a create producer timeout from client side we expect to see this sequence of commands :
         // 1. create a failure producer which will timeout creation after 100msec
@@ -964,7 +962,7 @@ public class ServerCnxTest {
 
             return null;
         }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
-                any(OpenLedgerCallback.class), anyObject());
+                any(OpenLedgerCallback.class), any());
 
         // In a subscribe timeout from client side we expect to see this sequence of commands :
         // 1. Subscribe
@@ -1037,7 +1035,7 @@ public class ServerCnxTest {
             });
             return null;
         }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
-                any(OpenLedgerCallback.class), anyObject());
+                any(OpenLedgerCallback.class), any());
 
         CompletableFuture<Runnable> openTopicFail = new CompletableFuture<>();
         doAnswer(invocationOnMock -> {
@@ -1047,7 +1045,7 @@ public class ServerCnxTest {
             });
             return null;
         }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
-                any(OpenLedgerCallback.class), anyObject());
+                any(OpenLedgerCallback.class), any());
 
         // In a subscribe timeout from client side we expect to see this sequence of commands :
         // 1. Subscribe against failtopic which will fail after 100msec
@@ -1444,7 +1442,7 @@ public class ServerCnxTest {
                 return null;
             }
         }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
-                any(OpenLedgerCallback.class), anyObject());
+                any(OpenLedgerCallback.class), any());
 
         // call openLedgerFailed on ML factory asyncOpen
         doAnswer(new Answer<Object>() {
@@ -1459,7 +1457,7 @@ public class ServerCnxTest {
                 return null;
             }
         }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
-                any(OpenLedgerCallback.class), anyObject());
+                any(OpenLedgerCallback.class), any());
 
         // call addComplete on ledger asyncAddEntry
         doAnswer(new Answer<Object>() {
@@ -1469,7 +1467,7 @@ public class ServerCnxTest {
                         invocationOnMock.getArguments()[2]);
                 return null;
             }
-        }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), anyObject());
+        }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), any());
 
         doAnswer(new Answer<Object>() {
             @Override
@@ -1478,7 +1476,7 @@ public class ServerCnxTest {
                 ((OpenCursorCallback) invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null);
                 return null;
             }
-        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), anyObject());
+        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), any());
 
         doAnswer(new Answer<Object>() {
             @Override
@@ -1488,7 +1486,7 @@ public class ServerCnxTest {
                 return null;
             }
         }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class),
-                any(OpenCursorCallback.class), anyObject());
+                any(OpenCursorCallback.class), any());
 
         doAnswer(new Answer<Object>() {
             @Override
@@ -1498,7 +1496,7 @@ public class ServerCnxTest {
                         .openCursorFailed(new ManagedLedgerException("Managed ledger failure"), null);
                 return null;
             }
-        }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(OpenCursorCallback.class), anyObject());
+        }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(OpenCursorCallback.class), any());
 
         doAnswer(new Answer<Object>() {
             @Override
@@ -1509,7 +1507,7 @@ public class ServerCnxTest {
                 return null;
             }
         }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(Map.class),
-                any(OpenCursorCallback.class), anyObject());
+                any(OpenCursorCallback.class), any());
 
         doAnswer(new Answer<Object>() {
             @Override
@@ -1517,7 +1515,7 @@ public class ServerCnxTest {
                 ((DeleteCursorCallback) invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
                 return null;
             }
-        }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), anyObject());
+        }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), any());
 
         doAnswer(new Answer<Object>() {
             @Override
@@ -1526,7 +1524,7 @@ public class ServerCnxTest {
                         .deleteCursorFailed(new ManagedLedgerException("Managed ledger failure"), null);
                 return null;
             }
-        }).when(ledgerMock).asyncDeleteCursor(matches(".*fail.*"), any(DeleteCursorCallback.class), anyObject());
+        }).when(ledgerMock).asyncDeleteCursor(matches(".*fail.*"), any(DeleteCursorCallback.class), any());
 
         doAnswer(new Answer<Object>() {
             @Override
@@ -1534,7 +1532,7 @@ public class ServerCnxTest {
                 ((CloseCallback) invocationOnMock.getArguments()[0]).closeComplete(null);
                 return null;
             }
-        }).when(cursorMock).asyncClose(any(CloseCallback.class), anyObject());
+        }).when(cursorMock).asyncClose(any(CloseCallback.class), any());
 
         doReturn(successSubName).when(cursorMock).getName();
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
index 9e65326..2d1d092 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
@@ -18,8 +18,30 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
+import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
-import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.Position;
@@ -33,8 +55,8 @@ import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.PersistentTopicTest;
-import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
@@ -49,32 +71,6 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
-import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
-import static org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyList;
-import static org.mockito.Matchers.anyMap;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
 @PrepareForTest({ ZooKeeperDataCache.class, BrokerService.class })
 public class PersistentSubscriptionTest {
 
@@ -170,14 +166,14 @@ public class PersistentSubscriptionTest {
             ((AsyncCallbacks.DeleteCallback) invocationOnMock.getArguments()[1])
                     .deleteComplete(invocationOnMock.getArguments()[2]);
             return null;
-        }).when(cursorMock).asyncDelete(anyList(), any(AsyncCallbacks.DeleteCallback.class), anyObject());
+        }).when(cursorMock).asyncDelete(any(List.class), any(AsyncCallbacks.DeleteCallback.class), any());
 
         doAnswer((invocationOnMock) -> {
             assertTrue(((PositionImpl)invocationOnMock.getArguments()[0]).compareTo(new PositionImpl(3, 100)) == 0);
             ((AsyncCallbacks.MarkDeleteCallback) invocationOnMock.getArguments()[2])
                     .markDeleteComplete(invocationOnMock.getArguments()[3]);
             return null;
-        }).when(cursorMock).asyncMarkDelete(anyObject(), anyObject(), any(AsyncCallbacks.MarkDeleteCallback.class), anyObject());
+        }).when(cursorMock).asyncMarkDelete(any(), any(), any(AsyncCallbacks.MarkDeleteCallback.class), any());
 
         List<Position> positions = new ArrayList<>();
         positions.add(new PositionImpl(1, 1));
@@ -197,8 +193,8 @@ public class PersistentSubscriptionTest {
         persistentSubscription.commitTxn(txnID1, Collections.emptyMap());
 
         // Verify corresponding ledger method was called with expected args.
-        verify(cursorMock, times(1)).asyncDelete(anyList(), any(), any());
-        verify(cursorMock, times(1)).asyncMarkDelete(any(), anyMap(), anyObject(), any());
+        verify(cursorMock, times(1)).asyncDelete(any(List.class), any(), any());
+        verify(cursorMock, times(1)).asyncMarkDelete(any(), any(Map.class), any(), any());
     }
 
     @Test
@@ -216,7 +212,7 @@ public class PersistentSubscriptionTest {
             ((AsyncCallbacks.DeleteCallback) invocationOnMock.getArguments()[1])
                     .deleteComplete(invocationOnMock.getArguments()[2]);
             return null;
-        }).when(cursorMock).asyncDelete(anyList(), any(AsyncCallbacks.DeleteCallback.class), anyObject());
+        }).when(cursorMock).asyncDelete(any(List.class), any(AsyncCallbacks.DeleteCallback.class), any());
 
         doReturn(PulsarApi.CommandSubscribe.SubType.Exclusive).when(consumerMock).subType();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java
index b4d1521..0d5e351 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -67,7 +67,7 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
         controller = mock(ReplicatedSubscriptionsController.class);
         when(controller.localCluster()).thenReturn(localCluster);
         doAnswer(invocation -> {
-            ByteBuf marker = invocation.getArgumentAt(0, ByteBuf.class);
+            ByteBuf marker = invocation.getArgument(0, ByteBuf.class);
             Commands.skipMessageMetadata(marker);
             markers.add(marker);
             return null;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidatorTest.java
index aa1cfbb..885a9e9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidatorTest.java
@@ -18,9 +18,9 @@
  */
 package org.apache.pulsar.broker.service.schema.validator;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.same;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 49206ff..51459fc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.client.api;
 
 import static org.apache.bookkeeper.test.PortManager.nextFreePort;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 130a35b..23b6f80 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.client.api;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index f437eda..8fdc5ed 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.client.api.v1;
 
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index c89fd20..e0aec98 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.client.impl;
 
 import static java.util.UUID.randomUUID;
 import static org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;
-import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
@@ -153,7 +153,7 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
         doAnswer(invocationOnMock -> {
             cons1.connectionClosed((ClientCnx) invocationOnMock.getArguments()[0]);
             return null;
-        }).when(consumer1).connectionClosed(anyObject());
+        }).when(consumer1).connectionClosed(any());
         ProducerImpl<byte[]> producer1 = spy(prod1);
         doAnswer(invocationOnMock -> prod1.getState()).when(producer1).getState();
         doAnswer(invocationOnMock -> prod1.getClientCnx()).when(producer1).getClientCnx();
@@ -161,7 +161,7 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
         doAnswer(invocationOnMock -> {
             prod1.connectionClosed((ClientCnx) invocationOnMock.getArguments()[0]);
             return null;
-        }).when(producer1).connectionClosed(anyObject());
+        }).when(producer1).connectionClosed(any());
         ProducerImpl<byte[]> producer2 = spy(prod2);
         doAnswer(invocationOnMock -> prod2.getState()).when(producer2).getState();
         doAnswer(invocationOnMock -> prod2.getClientCnx()).when(producer2).getClientCnx();
@@ -169,7 +169,7 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
         doAnswer(invocationOnMock -> {
             prod2.connectionClosed((ClientCnx) invocationOnMock.getArguments()[0]);
             return null;
-        }).when(producer2).connectionClosed(anyObject());
+        }).when(producer2).connectionClosed(any());
 
         ClientCnx clientCnx = producer1.getClientCnx();
 
@@ -200,11 +200,11 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
         // let server send signal to close-connection and client close the connection
         Thread.sleep(1000);
         // [1] Verify: producer1 must get connectionClosed signal
-        verify(producer1, atLeastOnce()).connectionClosed(anyObject());
+        verify(producer1, atLeastOnce()).connectionClosed(any());
         // [2] Verify: consumer1 must get connectionClosed signal
-        verify(consumer1, atLeastOnce()).connectionClosed(anyObject());
+        verify(consumer1, atLeastOnce()).connectionClosed(any());
         // [3] Verify: producer2 should have not received connectionClosed signal
-        verify(producer2, never()).connectionClosed(anyObject());
+        verify(producer2, never()).connectionClosed(any());
 
         // sleep for sometime to let other disconnected producer and consumer connect again: but they should not get
         // connected with same broker as that broker is already out from active-broker list
@@ -224,7 +224,7 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
         pulsar.getNamespaceService().unloadNamespaceBundle((NamespaceBundle) bundle2);
         // let producer2 give some time to get disconnect signal and get disconencted
         Thread.sleep(200);
-        verify(producer2, atLeastOnce()).connectionClosed(anyObject());
+        verify(producer2, atLeastOnce()).connectionClosed(any());
 
         // producer1 must not be able to connect again
         assertTrue(prod1.getClientCnx() == null);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleTest.java
index 064d357..e7a528c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.common.naming;
 
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java
index a8c504d..7ef0e32 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.common.naming;
 
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -169,7 +169,7 @@ public class NamespaceBundlesTest {
             bundles = new NamespaceBundles(topicName.getNamespaceObject(), newPar, factory);
             bundles.findBundle(topicName);
             fail("Should have failed due to out-of-range");
-        } catch (ArrayIndexOutOfBoundsException iae) {
+        } catch (IndexOutOfBoundsException iae) {
             // OK, expected
         }
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 1e68f80..5437327 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.compaction;
 
-import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertNull;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index aba83a7..41782a4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.io;
 
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
index 1ded3c6..bf19a18 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
@@ -18,9 +18,29 @@
  */
 package org.apache.kafka.clients.producer;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.anyVararg;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
+
 import org.apache.avro.reflect.Nullable;
 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -35,8 +55,6 @@ import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
 import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -45,25 +63,6 @@ import org.testng.IObjectFactory;
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.anyVararg;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
 @PrepareForTest({PulsarClientKafkaConfig.class, PulsarProducerKafkaConfig.class})
 @PowerMockIgnore({"org.apache.logging.log4j.*", "org.apache.kafka.clients.producer.ProducerInterceptor"})
 public class PulsarKafkaProducerTest {
@@ -96,26 +95,20 @@ public class PulsarKafkaProducerTest {
     public void testPulsarKafkaProducer() {
         ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
         ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable {
-                Assert.assertEquals((int)invocation.getArguments()[0], 1000000, "Send time out is suppose to be 1000.");
-                return mockProducerBuilder;
-            }
+        doAnswer(invocation -> {
+            Assert.assertEquals((int)invocation.getArguments()[0], 1000000, "Send time out is suppose to be 1000.");
+            return mockProducerBuilder;
         }).when(mockProducerBuilder).sendTimeout(anyInt(), any(TimeUnit.class));
         doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable {
-                Assert.assertEquals((int)invocation.getArguments()[0], 1000, "Keep alive interval is suppose to be 1000.");
-                return mockClientBuilder;
-            }
+        doAnswer(invocation -> {
+            Assert.assertEquals((int)invocation.getArguments()[0], 1000, "Keep alive interval is suppose to be 1000.");
+            return mockClientBuilder;
         }).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));
 
         PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
         PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
         when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
-        when(PulsarProducerKafkaConfig.getProducerBuilder(any(PulsarClient.class), any(Properties.class))).thenReturn(mockProducerBuilder);
+        when(PulsarProducerKafkaConfig.getProducerBuilder(any(), any())).thenReturn(mockProducerBuilder);
 
         Properties properties = new Properties();
         properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
index 0f15691..f96c9c7 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
@@ -48,7 +48,7 @@ import org.testng.annotations.Test;
 import java.util.Arrays;
 import java.util.Random;
 
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index c1e40fb..e3bae43 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -19,9 +19,9 @@
 package org.apache.pulsar.admin.cli;
 
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index f028636..a296857 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -19,12 +19,14 @@
 package org.apache.pulsar.admin.cli;
 
 import static org.junit.Assert.assertNull;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.longThat;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
-import static org.mockito.Mockito.times;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -45,10 +47,10 @@ import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.NonPersistentTopics;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.ResourceQuotas;
+import org.apache.pulsar.client.admin.Schemas;
 import org.apache.pulsar.client.admin.Tenants;
 import org.apache.pulsar.client.admin.Topics;
 import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
-import org.apache.pulsar.client.admin.Schemas;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -69,7 +71,6 @@ import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.mockito.ArgumentMatcher;
-import org.mockito.Matchers;
 import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
@@ -98,7 +99,7 @@ public class PulsarAdminToolTest {
 
         brokers.run(split("update-dynamic-config --config brokerShutdownTimeoutMs --value 100"));
         verify(mockBrokers).updateDynamicConfiguration("brokerShutdownTimeoutMs", "100");
-        
+
         brokers.run(split("delete-dynamic-config --config brokerShutdownTimeoutMs"));
         verify(mockBrokers).deleteDynamicConfiguration("brokerShutdownTimeoutMs");
 
@@ -288,13 +289,13 @@ public class PulsarAdminToolTest {
                 .run(split("set-bookie-affinity-group myprop/clust/ns1 --primary-group test1 --secondary-group test2"));
         verify(mockNamespaces).setBookieAffinityGroup("myprop/clust/ns1",
                 new BookieAffinityGroupData("test1", "test2"));
-        
+
         namespaces.run(split("get-bookie-affinity-group myprop/clust/ns1"));
         verify(mockNamespaces).getBookieAffinityGroup("myprop/clust/ns1");
-        
+
         namespaces.run(split("delete-bookie-affinity-group myprop/clust/ns1"));
         verify(mockNamespaces).deleteBookieAffinityGroup("myprop/clust/ns1");
-        
+
         namespaces.run(split("unload myprop/clust/ns1"));
         verify(mockNamespaces).unload("myprop/clust/ns1");
 
@@ -653,10 +654,9 @@ public class PulsarAdminToolTest {
 
         // argument matcher for the timestamp in reset cursor. Since we can't verify exact timestamp, we check for a
         // range of +/- 1 second of the expected timestamp
-        class TimestampMatcher extends ArgumentMatcher<Long> {
+        class TimestampMatcher implements ArgumentMatcher<Long> {
             @Override
-            public boolean matches(Object argument) {
-                long timestamp = (Long) argument;
+            public boolean matches(Long timestamp) {
                 long expectedTimestamp = System.currentTimeMillis() - (1 * 60 * 1000);
                 if (timestamp < (expectedTimestamp + 1000) && timestamp > (expectedTimestamp - 1000)) {
                     return true;
@@ -665,8 +665,8 @@ public class PulsarAdminToolTest {
             }
         }
         cmdTopics.run(split("reset-cursor persistent://myprop/clust/ns1/ds1 -s sub1 -t 1m"));
-        verify(mockTopics).resetCursor(Matchers.eq("persistent://myprop/clust/ns1/ds1"), Matchers.eq("sub1"),
-                Matchers.longThat(new TimestampMatcher()));
+        verify(mockTopics).resetCursor(eq("persistent://myprop/clust/ns1/ds1"), eq("sub1"),
+                longThat(new TimestampMatcher()));
     }
 
     @Test
@@ -736,10 +736,9 @@ public class PulsarAdminToolTest {
 
         // argument matcher for the timestamp in reset cursor. Since we can't verify exact timestamp, we check for a
         // range of +/- 1 second of the expected timestamp
-        class TimestampMatcher extends ArgumentMatcher<Long> {
+        class TimestampMatcher implements ArgumentMatcher<Long> {
             @Override
-            public boolean matches(Object argument) {
-                long timestamp = (Long) argument;
+            public boolean matches(Long timestamp) {
                 long expectedTimestamp = System.currentTimeMillis() - (1 * 60 * 1000);
                 if (timestamp < (expectedTimestamp + 1000) && timestamp > (expectedTimestamp - 1000)) {
                     return true;
@@ -748,11 +747,10 @@ public class PulsarAdminToolTest {
             }
         }
         topics.run(split("reset-cursor persistent://myprop/clust/ns1/ds1 -s sub1 -t 1m"));
-        verify(mockTopics).resetCursor(Matchers.eq("persistent://myprop/clust/ns1/ds1"), Matchers.eq("sub1"),
-                Matchers.longThat(new TimestampMatcher()));
+        verify(mockTopics).resetCursor(eq("persistent://myprop/clust/ns1/ds1"), eq("sub1"),
+                longThat(new TimestampMatcher()));
     }
 
-
     @Test
     void nonPersistentTopics() throws Exception {
         PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
@@ -844,7 +842,7 @@ public class PulsarAdminToolTest {
         assertNull(atuh.getCertFilePath());
         assertNull(atuh.getKeyFilePath());
     }
-    
+
     String[] split(String s) {
         return s.split(" ");
     }
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index 2a48586..f0418c9 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.admin.cli;
 
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
@@ -251,7 +251,7 @@ public class TestCmdSinks {
                 sinkConfig
         );
     }
-    
+
     @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Sink archive not specfied")
     public void testMissingArchive() throws Exception {
         SinkConfig sinkConfig = getSinkConfig();
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index da6e99f..cbf121d 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.admin.cli;
 
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-import static org.mockito.Matchers.eq;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
@@ -33,16 +33,14 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
 import java.io.File;
 import java.nio.file.Files;
 
-import lombok.extern.slf4j.Slf4j;
-
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
-import org.apache.pulsar.client.admin.Sources;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.Sources;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.functions.UpdateOptions;
 import org.apache.pulsar.common.io.SourceConfig;
-import org.apache.pulsar.functions.utils.*;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -52,7 +50,6 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
-@Slf4j
 @PrepareForTest({CmdFunctions.class})
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.core.*" })
 public class TestCmdSources {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index dea384f..3398d36 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -156,6 +156,11 @@ public class MessageImpl<T> implements Message<T> {
     }
 
     public MessageImpl(String topic, String msgId, Map<String, String> properties,
+            byte[] payload, Schema<T> schema) {
+        this(topic, msgId, properties, Unpooled.wrappedBuffer(payload), schema);
+    }
+
+    public MessageImpl(String topic, String msgId, Map<String, String> properties,
                        ByteBuf payload, Schema<T> schema) {
         String[] data = msgId.split(":");
         long ledgerId = Long.parseLong(data[0]);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageRouterTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageRouterTest.java
index 5da9f6e..48f5816 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageRouterTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageRouterTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.client.api;
 
-import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index c969374..ab73b67 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
-import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertTrue;
@@ -47,8 +47,8 @@ public class ClientCnxTest {
 
         ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
         ChannelFuture listenerFuture = mock(ChannelFuture.class);
-        when(listenerFuture.addListener(anyObject())).thenReturn(listenerFuture);
-        when(ctx.writeAndFlush(anyObject())).thenReturn(listenerFuture);
+        when(listenerFuture.addListener(any())).thenReturn(listenerFuture);
+        when(ctx.writeAndFlush(any())).thenReturn(listenerFuture);
 
         Field ctxField = PulsarHandler.class.getDeclaredField("ctx");
         ctxField.setAccessible(true);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
index 5475050..e253ace 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
@@ -34,7 +34,7 @@ import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.powermock.api.mockito.PowerMockito.when;
 import static org.testng.Assert.assertEquals;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
index d6877c1..862237a 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.client.impl;
 
 import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.mockito.Matchers;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
@@ -29,7 +28,8 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
-import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertNotNull;
@@ -51,7 +51,7 @@ public class ProducerBuilderImplTest {
         when(client.newProducer()).thenReturn(producerBuilderImpl);
 
         when(client.createProducerAsync(
-                Matchers.any(ProducerConfigurationData.class), Matchers.any(Schema.class), eq(null)))
+                any(ProducerConfigurationData.class), any(Schema.class), eq(null)))
                 .thenReturn(CompletableFuture.completedFuture(producer));
     }
 
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningAvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningAvroSchemaTest.java
index 2ee5797..1b3ef52 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningAvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningAvroSchemaTest.java
@@ -25,7 +25,7 @@ import org.apache.pulsar.common.schema.SchemaInfo;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.powermock.api.mockito.PowerMockito.when;
 import static org.testng.Assert.assertEquals;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java
index 375baf5..ebcc00d 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java
@@ -26,7 +26,7 @@ import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.powermock.api.mockito.PowerMockito.when;
 
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchemaTest.java
index 9aefac4..814975d 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchemaTest.java
@@ -28,9 +28,9 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
+import static org.mockito.Mockito.when;
 
 public class GenericAvroSchemaTest {
 
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
index 79c2486..74133f4 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.client.impl.schema.generic;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
index ed14580..2e101b1 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
@@ -18,12 +18,14 @@
  */
 package org.apache.pulsar.client.impl.schema.generic;
 
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 
-import org.apache.pulsar.client.api.schema.GenericSchema;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.LookupService;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -34,9 +36,6 @@ import org.apache.pulsar.common.schema.SchemaInfo;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
 /**
  * Unit test for {@link MultiVersionSchemaInfoProvider}.
  */
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/util/ObjectCacheTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/util/ObjectCacheTest.java
index 771e387..f0c2cbc 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/util/ObjectCacheTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/util/ObjectCacheTest.java
@@ -37,13 +37,14 @@ public class ObjectCacheTest {
         AtomicLong currentTime = new AtomicLong(0);
 
         Clock clock = mock(Clock.class);
-        when(clock.millis()).then(invocation -> currentTime);
+        when(clock.millis()).then(invocation -> currentTime.longValue());
 
         AtomicInteger currentValue = new AtomicInteger(0);
 
         Supplier<Integer> cache = new ObjectCache<>(() -> currentValue.getAndIncrement(),
                 10, TimeUnit.MILLISECONDS, clock);
 
+        cache.get();
         assertEquals(cache.get().intValue(), 0);
         assertEquals(cache.get().intValue(), 0);
 
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
index fdbbc9f..f458c5d 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
@@ -28,6 +28,7 @@ import org.testng.annotations.Test;
 @Test
 public class TopicNameTest {
 
+    @SuppressWarnings("deprecation")
     @Test
     void topic() {
         try {
@@ -215,6 +216,7 @@ public class TopicNameTest {
         assertEquals(name.getPersistenceNamingEncoding(), "prop/colo/ns/persistent/" + encodedName);
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void testTopicNameWithoutCluster() throws Exception {
         TopicName topicName = TopicName.get("persistent://tenant/namespace/topic");
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/ByteBufPairTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/ByteBufPairTest.java
index 548b7be..b17f50a 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/ByteBufPairTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/ByteBufPairTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.common.protocol;
 
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/PulsarDecoderTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/PulsarDecoderTest.java
index 7f41e49..89e4871 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/PulsarDecoderTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/PulsarDecoderTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.common.protocol;
 
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.CALLS_REAL_METHODS;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
index 7fd48c2..83a7549 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.flink.streaming.connectors.pulsar;
 
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.avro.generated.NasaMission;
@@ -30,7 +31,6 @@ import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.mockito.Mockito;
-import org.mockito.internal.util.reflection.Whitebox;
 import org.powermock.api.mockito.PowerMockito;
 import org.testng.annotations.Test;
 
@@ -109,10 +109,10 @@ public class PulsarAvroTableSinkTest {
                 Mockito.any(SerializationSchema.class),
                 Mockito.any(PulsarKeyExtractor.class)
         ).thenReturn(producer);
-        Whitebox.setInternalState(sink, "fieldNames", fieldNames);
-        Whitebox.setInternalState(sink, "fieldTypes", typeInformations);
-        Whitebox.setInternalState(sink, "serializationSchema", Mockito.mock(SerializationSchema.class));
-        Whitebox.setInternalState(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class));
+        FieldUtils.writeField(sink, "fieldNames", fieldNames, true);
+        FieldUtils.writeField(sink, "fieldTypes", typeInformations, true);
+        FieldUtils.writeField(sink, "serializationSchema", Mockito.mock(SerializationSchema.class), true);
+        FieldUtils.writeField(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class), true);
         return sink;
     }
 
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
index b96f971..82d831b 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
@@ -38,7 +38,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
+import io.netty.buffer.Unpooled;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -57,7 +57,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 
 /**
  * Tests for the PulsarConsumerSource. The source supports two operation modes.
@@ -542,7 +542,7 @@ public class PulsarConsumerSourceTests {
 
     private static Message<byte[]> createMessage(String content, String messageId) {
         return new MessageImpl<byte[]>("my-topic", messageId, Collections.emptyMap(),
-                                       Unpooled.wrappedBuffer(content.getBytes()), Schema.BYTES);
+                                       content.getBytes(), Schema.BYTES);
     }
 
     private static String createMessageId(long ledgerId, long entryId, long partitionIndex) {
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
index 668a8e5..02462b3 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.flink.streaming.connectors.pulsar;
 
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -29,7 +30,6 @@ import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.mockito.Mockito;
-import org.mockito.internal.util.reflection.Whitebox;
 import org.powermock.api.mockito.PowerMockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -105,10 +105,11 @@ public class PulsarJsonTableSinkTest {
                 Mockito.any(SerializationSchema.class),
                 Mockito.any(PulsarKeyExtractor.class)
         ).thenReturn(producer);
-        Whitebox.setInternalState(sink, "fieldNames", fieldNames);
-        Whitebox.setInternalState(sink, "fieldTypes", typeInformations);
-        Whitebox.setInternalState(sink, "serializationSchema", Mockito.mock(SerializationSchema.class));
-        Whitebox.setInternalState(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class));
+
+        FieldUtils.writeField(sink, "fieldNames", fieldNames, true);
+        FieldUtils.writeField(sink, "fieldTypes", typeInformations, true);
+        FieldUtils.writeField(sink, "serializationSchema", Mockito.mock(SerializationSchema.class), true);
+        FieldUtils.writeField(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class), true);
         return sink;
     }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index fd81260..4843362 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -46,7 +46,7 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
     private final PulsarSourceConfig pulsarSourceConfig;
     private final Map<String, String> properties;
     private List<String> inputTopics;
-    private List<Consumer<T>> inputConsumers;
+    private List<Consumer<T>> inputConsumers = Collections.emptyList();
     private final TopicSchema topicSchema;
 
     public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig, Map<String, String> properties) {
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 43a5649..8ababe4 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -18,7 +18,24 @@
  */
 package org.apache.pulsar.functions.instance;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import io.prometheus.client.CollectorRegistry;
+
+import java.nio.ByteBuffer;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
@@ -31,26 +48,10 @@ import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
-import org.mockito.Matchers;
 import org.slf4j.Logger;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.nio.ByteBuffer;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
 /**
  * Unit test {@link ContextImpl}.
  */
@@ -72,7 +73,7 @@ public class ContextImplTest {
         logger = mock(Logger.class);
         client = mock(PulsarClientImpl.class);
         when(client.newProducer()).thenReturn(new ProducerBuilderImpl(client, Schema.BYTES));
-        when(client.createProducerAsync(Matchers.any(ProducerConfigurationData.class), Matchers.any(Schema.class), eq(null)))
+        when(client.createProducerAsync(any(ProducerConfigurationData.class), any(), any()))
                 .thenReturn(CompletableFuture.completedFuture(producer));
         when(client.getSchema(anyString())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
         when(producer.sendAsync(anyString())).thenReturn(CompletableFuture.completedFuture(null));
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
deleted file mode 100644
index a11062f..0000000
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
+++ /dev/null
@@ -1,1012 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-//package org.apache.pulsar.functions.instance;
-//
-//import static java.nio.charset.StandardCharsets.UTF_8;
-//import static org.mockito.Matchers.any;
-//import static org.mockito.Matchers.anyList;
-//import static org.mockito.Matchers.anyLong;
-//import static org.mockito.Matchers.anyString;
-//import static org.mockito.Matchers.eq;
-//import static org.mockito.Matchers.same;
-//import static org.mockito.Mockito.doAnswer;
-//import static org.mockito.Mockito.doNothing;
-//import static org.mockito.Mockito.mock;
-//import static org.mockito.Mockito.spy;
-//import static org.mockito.Mockito.times;
-//import static org.mockito.Mockito.verify;
-//import static org.mockito.Mockito.when;
-//import static org.testng.Assert.assertEquals;
-//import static org.testng.Assert.assertNull;
-//import static org.testng.Assert.assertSame;
-//import static org.testng.Assert.assertTrue;
-//
-//import io.netty.buffer.ByteBuf;
-//import java.util.Arrays;
-//import java.util.Collections;
-//import java.util.HashMap;
-//import java.util.HashSet;
-//import java.util.LinkedList;
-//import java.util.List;
-//import java.util.Map;
-//import java.util.Set;
-//import java.util.TreeMap;
-//import java.util.concurrent.CompletableFuture;
-//import java.util.concurrent.ExecutorService;
-//import java.util.concurrent.Executors;
-//import java.util.concurrent.LinkedBlockingQueue;
-//import java.util.concurrent.TimeUnit;
-//import lombok.Cleanup;
-//import lombok.Data;
-//import lombok.extern.slf4j.Slf4j;
-//import org.apache.bookkeeper.api.StorageClient;
-//import org.apache.bookkeeper.api.kv.Table;
-//import org.apache.bookkeeper.clients.StorageClientBuilder;
-//import org.apache.bookkeeper.clients.admin.StorageAdminClient;
-//import org.apache.bookkeeper.clients.config.StorageClientSettings;
-//import org.apache.bookkeeper.common.concurrent.FutureUtils;
-//import org.apache.bookkeeper.stream.proto.StreamProperties;
-//import org.apache.commons.lang3.tuple.Pair;
-//import org.apache.pulsar.client.api.Consumer;
-//import org.apache.pulsar.client.api.ConsumerConfiguration;
-//import org.apache.pulsar.client.api.Message;
-//import org.apache.pulsar.client.api.MessageBuilder;
-//import org.apache.pulsar.client.api.MessageId;
-//import org.apache.pulsar.client.api.Producer;
-//import org.apache.pulsar.client.api.ProducerConfiguration;
-//import org.apache.pulsar.client.api.PulsarClient;
-//import org.apache.pulsar.client.impl.MessageIdImpl;
-//import org.apache.pulsar.client.impl.PulsarClientImpl;
-//import org.apache.pulsar.functions.api.Context;
-//import org.apache.pulsar.functions.api.Function;
-//import org.apache.pulsar.functions.api.utils.DefaultSerDe;
-//import org.apache.pulsar.functions.instance.processors.AtLeastOnceProcessor;
-//import org.apache.pulsar.functions.instance.processors.MessageProcessor;
-//import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-//import org.apache.pulsar.functions.proto.Function.FunctionDetails.ProcessingGuarantees;
-//import org.apache.pulsar.functions.utils.Reflections;
-//import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
-//import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
-//import org.apache.pulsar.functions.utils.Utils;
-//import org.powermock.api.mockito.PowerMockito;
-//import org.powermock.core.classloader.annotations.PowerMockIgnore;
-//import org.powermock.core.classloader.annotations.PrepareForTest;
-//import org.powermock.reflect.Whitebox;
-//import org.testng.IObjectFactory;
-//import org.testng.annotations.BeforeMethod;
-//import org.testng.annotations.ObjectFactory;
-//import org.testng.annotations.Test;
-//
-///**
-// * Test the processing logic of a {@link JavaInstanceRunnable}.
-// */
-//@Slf4j
-//@PrepareForTest({ JavaInstanceRunnable.class, StorageClientBuilder.class, MessageBuilder.class, Reflections.class })
-//@PowerMockIgnore({ "javax.management.*", "org.apache.pulsar.common.api.proto.*", "org.apache.logging.log4j.*", "org/apache/pulsar/common/api/proto/PulsarApi*", "org.apache.pulsar.common.util.protobuf.*", "org.apache.pulsar.shade.*" })
-//public class JavaInstanceRunnableProcessTest {
-//
-//    @ObjectFactory
-//    public IObjectFactory getObjectFactory() {
-//        return new org.powermock.modules.testng.PowerMockObjectFactory();
-//    }
-//
-//    private static class TestFunction implements Function<String, String> {
-//        @Override
-//        public String process(String input, Context context) throws Exception {
-//            return input + "!";
-//        }
-//    }
-//
-//    private static class TestFailureFunction implements Function<String, String> {
-//
-//        private int processId2Count = 0;
-//
-//        @Override
-//        public String process(String input, Context context) throws Exception {
-//            int id = Integer.parseInt(input.replace("message-", ""));
-//            if (id % 2 == 0) {
-//                if (id == 2) {
-//                    processId2Count++;
-//                    if (processId2Count > 1) {
-//                        return input + "!";
-//                    }
-//                }
-//                throw new Exception("Failed to process message " + id);
-//            }
-//            return input + "!";
-//        }
-//    }
-//
-//    private static class TestVoidFunction implements Function<String, Void> {
-//
-//        @Override
-//        public Void process(String input, Context context) throws Exception {
-//            log.info("process input '{}'", input);
-//            voidFunctionQueue.put(input);
-//            return null;
-//        }
-//    }
-//
-//    @Data
-//    private static class ConsumerInstance {
-//        private final Consumer consumer;
-//        private final ConsumerConfiguration conf;
-//        private final TreeMap<MessageId, Message> messages;
-//
-//        public ConsumerInstance(Consumer consumer,
-//                                ConsumerConfiguration conf) {
-//            this.consumer = consumer;
-//            this.conf = conf;
-//            this.messages = new TreeMap<>();
-//        }
-//
-//        public synchronized int getNumMessages() {
-//            return this.messages.size();
-//        }
-//
-//        public synchronized void addMessage(Message message) {
-//            this.messages.put(message.getMessageId(), message);
-//        }
-//
-//        public synchronized void removeMessage(MessageId msgId) {
-//            this.messages.remove(msgId);
-//        }
-//
-//        public synchronized boolean containMessage(MessageId msgId) {
-//            return this.messages.containsKey(msgId);
-//        }
-//
-//        public synchronized void removeMessagesBefore(MessageId targetMsgId) {
-//            Set<MessageId> messagesToRemove = new HashSet<>();
-//            messages.forEach((msgId, message) -> {
-//                if (msgId.compareTo(targetMsgId) <= 0) {
-//                    messagesToRemove.add(msgId);
-//                }
-//            });
-//            for (MessageId msgId : messagesToRemove) {
-//                messages.remove(msgId);
-//            }
-//        }
-//    }
-//
-//    @Data
-//    private static class ProducerInstance {
-//        private final Producer producer;
-//        private final LinkedBlockingQueue<Message> msgQueue;
-//        private final List<CompletableFuture<MessageId>> sendFutures;
-//
-//        public ProducerInstance(Producer producer,
-//                                LinkedBlockingQueue<Message> msgQueue) {
-//            this.producer = producer;
-//            this.msgQueue = msgQueue;
-//            this.sendFutures = new LinkedList<>();
-//        }
-//
-//        public synchronized void addSendFuture(CompletableFuture<MessageId> future) {
-//            this.sendFutures.add(future);
-//        }
-//
-//    }
-//
-//
-//    private static final String TEST_STORAGE_SERVICE_URL = "127.0.0.1:4181";
-//    private static final LinkedBlockingQueue<String> voidFunctionQueue = new LinkedBlockingQueue<>();
-//
-//    private FunctionDetails functionDetails;
-//    private InstanceConfig config;
-//    private FunctionCacheManager fnCache;
-//    private PulsarClient mockClient;
-//    private FunctionStats mockFunctionStats;
-//
-//    private final Map<Pair<String, String>, ProducerInstance> mockProducers
-//            = Collections.synchronizedMap(new HashMap<>());
-//    private final Map<Pair<String, String>, ConsumerInstance> mockConsumers
-//            = Collections.synchronizedMap(new HashMap<>());
-//    private StorageClient mockStorageClient;
-//    private Table<ByteBuf, ByteBuf> mockTable;
-//
-//    @BeforeMethod
-//    public void setup() throws Exception {
-//        mockProducers.clear();
-//        mockConsumers.clear();
-//
-//        functionDetails = FunctionDetails.newBuilder()
-//                .setAutoAck(true)
-//                .setClassName(TestFunction.class.getName())
-//                .addInputs("test-src-topic")
-//                .setName("test-function")
-//                .setOutput("test-output-topic")
-//                .setProcessingGuarantees(ProcessingGuarantees.ATLEAST_ONCE)
-//                .setTenant("test-tenant")
-//                .setNamespace("test-namespace")
-//                .build();
-//
-//        config = new InstanceConfig();
-//        config.setFunctionId("test-function-id");
-//        config.setFunctionVersion("v1");
-//        config.setInstanceId("test-instance-id");
-//        config.setMaxBufferedTuples(1000);
-//        config.setFunctionDetails(functionDetails);
-//
-//        mockClient = mock(PulsarClientImpl.class);
-//
-//        // mock FunctionCacheManager
-//        fnCache = mock(FunctionCacheManager.class);
-//        doNothing().when(fnCache).registerFunctionInstance(anyString(), anyString(), anyList(), anyList());
-//        doNothing().when(fnCache).unregisterFunctionInstance(anyString(), anyString());
-//
-//        ClassLoader clsLoader = JavaInstanceRunnableTest.class.getClassLoader();
-//        when(fnCache.getClassLoader(anyString()))
-//                .thenReturn(clsLoader);
-//
-//        // mock producer & consumer
-//        when(mockClient.createProducer(anyString(), any(ProducerConfiguration.class)))
-//                .thenAnswer(invocationOnMock -> {
-//                    String topic = invocationOnMock.getArgumentAt(0, String.class);
-//                    ProducerConfiguration conf = invocationOnMock.getArgumentAt(1, ProducerConfiguration.class);
-//                    String producerName = conf.getProducerName();
-//
-//                    Pair<String, String> pair = Pair.of(topic, producerName);
-//                    ProducerInstance producerInstance = mockProducers.get(pair);
-//                    if (null == producerInstance) {
-//                        Producer producer = mock(Producer.class);
-//                        LinkedBlockingQueue<Message> msgQueue = new LinkedBlockingQueue<>();
-//                        final ProducerInstance instance = new ProducerInstance(producer, msgQueue);
-//                        producerInstance = instance;
-//                        when(producer.getProducerName())
-//                                .thenReturn(producerName);
-//                        when(producer.getTopic())
-//                                .thenReturn(topic);
-//                        when(producer.sendAsync(any(Message.class)))
-//                                .thenAnswer(invocationOnMock1 -> {
-//                                    Message msg = invocationOnMock1.getArgumentAt(0, Message.class);
-//                                    log.info("producer send message {}", msg);
-//
-//                                    CompletableFuture<MessageId> future = new CompletableFuture<>();
-//                                    instance.addSendFuture(future);
-//                                    msgQueue.put(msg);
-//                                    return future;
-//                                });
-//                        when(producer.closeAsync()).thenReturn(FutureUtils.Void());
-//
-//                        mockProducers.put(pair, producerInstance);
-//                    }
-//                    return producerInstance.getProducer();
-//                });
-//        when(mockClient.subscribe(
-//                anyString(),
-//                anyString(),
-//                any(ConsumerConfiguration.class)
-//        )).thenAnswer(invocationOnMock -> {
-//            String topic = invocationOnMock.getArgumentAt(0, String.class);
-//            String subscription = invocationOnMock.getArgumentAt(1, String.class);
-//            ConsumerConfiguration conf = invocationOnMock.getArgumentAt(2, ConsumerConfiguration.class);
-//
-//            Pair<String, String> pair = Pair.of(topic, subscription);
-//            ConsumerInstance consumerInstance = mockConsumers.get(pair);
-//            if (null == consumerInstance) {
-//                Consumer consumer = mock(Consumer.class);
-//
-//                ConsumerInstance instance = new ConsumerInstance(consumer, conf);
-//                consumerInstance = instance;
-//                when(consumer.getTopic()).thenReturn(topic);
-//                when(consumer.getSubscription()).thenReturn(subscription);
-//                when(consumer.acknowledgeAsync(any(Message.class)))
-//                        .thenAnswer(invocationOnMock1 -> {
-//                            Message msg = invocationOnMock1.getArgumentAt(0, Message.class);
-//                            log.info("Ack message {} : message id = {}", msg, msg.getMessageId());
-//
-//                            instance.removeMessage(msg.getMessageId());
-//                            return FutureUtils.Void();
-//                        });
-//                when(consumer.acknowledgeCumulativeAsync(any(Message.class)))
-//                        .thenAnswer(invocationOnMock1 -> {
-//                            Message msg = invocationOnMock1.getArgumentAt(0, Message.class);
-//                            log.info("Ack message cumulatively message id = {}", msg, msg.getMessageId());
-//
-//                            instance.removeMessagesBefore(msg.getMessageId());
-//                            return FutureUtils.Void();
-//                        });
-//                when(consumer.closeAsync())
-//                        .thenAnswer(invocationOnMock1 -> {
-//                            mockConsumers.remove(pair, instance);
-//                            return FutureUtils.Void();
-//                        });
-//                doAnswer(invocationOnMock1 -> {
-//                    mockConsumers.remove(pair, instance);
-//                    return null;
-//                }).when(consumer).close();
-//
-//
-//                mockConsumers.put(pair, consumerInstance);
-//            }
-//            return consumerInstance.getConsumer();
-//        });
-//
-//        //
-//        // Mock State Store
-//        //
-//
-//        StorageClientBuilder mockBuilder = mock(StorageClientBuilder.class);
-//        when(mockBuilder.withNamespace(anyString())).thenReturn(mockBuilder);
-//        when(mockBuilder.withSettings(any(StorageClientSettings.class))).thenReturn(mockBuilder);
-//        this.mockStorageClient = mock(StorageClient.class);
-//        when(mockBuilder.build()).thenReturn(mockStorageClient);
-//        StorageAdminClient adminClient = mock(StorageAdminClient.class);
-//        when(mockBuilder.buildAdmin()).thenReturn(adminClient);
-//
-//        PowerMockito.mockStatic(StorageClientBuilder.class);
-//        PowerMockito.when(
-//                StorageClientBuilder.newBuilder()
-//        ).thenReturn(mockBuilder);
-//
-//        when(adminClient.getStream(anyString(), anyString())).thenReturn(FutureUtils.value(
-//                StreamProperties.newBuilder().build()));
-//        mockTable = mock(Table.class);
-//        when(mockStorageClient.openTable(anyString())).thenReturn(FutureUtils.value(mockTable));
-//
-//        //
-//        // Mock Function Stats
-//        //
-//
-//        mockFunctionStats = spy(new FunctionStats());
-//        PowerMockito.whenNew(FunctionStats.class)
-//                .withNoArguments()
-//                .thenReturn(mockFunctionStats);
-//
-//        // Mock message builder
-//        PowerMockito.mockStatic(MessageBuilder.class);
-//        PowerMockito.when(MessageBuilder.create())
-//                .thenAnswer(invocationOnMock -> {
-//
-//                    Message msg = mock(Message.class);
-//                    MessageBuilder builder = mock(MessageBuilder.class);
-//                    when(builder.setContent(any(byte[].class)))
-//                            .thenAnswer(invocationOnMock1 -> {
-//                                byte[] content = invocationOnMock1.getArgumentAt(0, byte[].class);
-//                                when(msg.getData()).thenReturn(content);
-//                                return builder;
-//                            });
-//                    when(builder.setSequenceId(anyLong()))
-//                            .thenAnswer(invocationOnMock1 -> {
-//                                long seqId = invocationOnMock1.getArgumentAt(0, long.class);
-//                                when(msg.getSequenceId()).thenReturn(seqId);
-//                                return builder;
-//                            });
-//                    when(builder.setProperty(anyString(), anyString()))
-//                            .thenAnswer(invocationOnMock1 -> {
-//                                String key = invocationOnMock1.getArgumentAt(0, String.class);
-//                                String value = invocationOnMock1.getArgumentAt(1, String.class);
-//                                when(msg.getProperty(eq(key))).thenReturn(value);
-//                                return builder;
-//                            });
-//                    when(builder.build()).thenReturn(msg);
-//                    return builder;
-//                });
-//    }
-//
-//    /**
-//     * Test the basic run logic of instance.
-//     */
-//    @Test
-//    public void testSetupJavaInstance() throws Exception {
-//        JavaInstanceRunnable runnable = new JavaInstanceRunnable(
-//                config,
-//                fnCache,
-//                "test-jar-file",
-//                mockClient,
-//                TEST_STORAGE_SERVICE_URL);
-//
-//        runnable.setupJavaInstance();
-//
-//        // verify
-//
-//        // 1. verify jar is loaded
-//        verify(fnCache, times(1))
-//                .registerFunctionInstance(
-//                        eq(config.getFunctionId()),
-//                        eq(config.getInstanceId()),
-//                        eq(Arrays.asList("test-jar-file")),
-//                        eq(Collections.emptyList())
-//                );
-//        verify(fnCache, times(1))
-//                .getClassLoader(eq(config.getFunctionId()));
-//
-//        // 2. verify serde is setup
-//        for (String inputTopic : functionDetails.getInputsList()) {
-//            assertTrue(runnable.getInputSerDe().containsKey(inputTopic));
-//            assertTrue(runnable.getInputSerDe().get(inputTopic) instanceof DefaultSerDe);
-//            DefaultSerDe serDe = (DefaultSerDe) runnable.getInputSerDe().get(inputTopic);
-//            assertEquals(String.class, Whitebox.getInternalState(serDe, "type"));
-//        }
-//
-//        // 3. verify producers and consumers are setup
-//        MessageProcessor processor = runnable.getProcessor();
-//        assertTrue(processor instanceof AtLeastOnceProcessor);
-//        assertSame(mockProducers.get(Pair.of(
-//                functionDetails.getOutput(),
-//                null
-//        )).getProducer(), ((AtLeastOnceProcessor) processor).getProducer());
-//
-//        assertEquals(mockConsumers.size(), processor.getInputConsumers().size());
-//        for (Map.Entry<String, Consumer> consumerEntry : processor.getInputConsumers().entrySet()) {
-//            String topic = consumerEntry.getKey();
-//
-//            Consumer mockConsumer = mockConsumers.get(Pair.of(
-//                    topic,
-//                    FunctionDetailsUtils.getFullyQualifiedName(functionDetails))).getConsumer();
-//            assertSame(mockConsumer, consumerEntry.getValue());
-//        }
-//
-//        // 4. verify state table
-//        assertSame(mockStorageClient, runnable.getStorageClient());
-//        assertSame(mockTable, runnable.getStateTable());
-//
-//        runnable.close();
-//
-//        // verify close
-//        for (ConsumerInstance consumer : mockConsumers.values()) {
-//            verify(consumer.getConsumer(), times(1)).close();
-//        }
-//        assertTrue(processor.getInputConsumers().isEmpty());
-//
-//        for (ProducerInstance producer : mockProducers.values()) {
-//            verify(producer.getProducer(), times(1)).close();
-//        }
-//
-//        verify(mockTable, times(1)).close();
-//        verify(mockStorageClient, times(1)).close();
-//
-//        // function is unregistered
-//        verify(fnCache, times(1)).unregisterFunctionInstance(
-//                eq(config.getFunctionId()), eq(config.getInstanceId()));
-//
-//    }
-//
-//    @Test
-//    public void testAtMostOnceProcessing() throws Exception {
-//        FunctionDetails newFunctionDetails = FunctionDetails.newBuilder(functionDetails)
-//                .setProcessingGuarantees(ProcessingGuarantees.ATMOST_ONCE)
-//                .build();
-//        config.setFunctionDetails(newFunctionDetails);
-//
-//        @Cleanup("shutdown")
-//        ExecutorService executorService = Executors.newSingleThreadExecutor();
-//
-//        try (JavaInstanceRunnable runnable = new JavaInstanceRunnable(
-//                config,
-//                fnCache,
-//                "test-jar-file",
-//                mockClient,
-//                null)) {
-//
-//            executorService.submit(runnable);
-//
-//            Pair<String, String> consumerId = Pair.of(
-//                    newFunctionDetails.getInputs(0),
-//                    FunctionDetailsUtils.getFullyQualifiedName(newFunctionDetails));
-//            ConsumerInstance consumerInstance = mockConsumers.get(consumerId);
-//            while (null == consumerInstance) {
-//                TimeUnit.MILLISECONDS.sleep(20);
-//                consumerInstance = mockConsumers.get(consumerId);
-//            }
-//
-//            ProducerInstance producerInstance = mockProducers.values().iterator().next();
-//
-//            // once we get consumer id, simulate receiving 10 messages from consumer
-//            for (int i = 0; i < 10; i++) {
-//                Message msg = mock(Message.class);
-//                when(msg.getData()).thenReturn(("message-" + i).getBytes(UTF_8));
-//                when(msg.getMessageId())
-//                        .thenReturn(new MessageIdImpl(1L, i, 0));
-//                consumerInstance.addMessage(msg);
-//                consumerInstance.getConf().getMessageListener()
-//                        .received(consumerInstance.getConsumer(), msg);
-//            }
-//
-//            // wait until all the messages are published
-//            for (int i = 0; i < 10; i++) {
-//                Message msg = producerInstance.msgQueue.take();
-//
-//                assertEquals("message-" + i + "!", new String(msg.getData(), UTF_8));
-//                // sequence id is not set for AT_MOST_ONCE processing
-//                assertEquals(0L, msg.getSequenceId());
-//            }
-//
-//            // verify acknowledge before send completes
-//            verify(consumerInstance.getConsumer(), times(10))
-//                    .acknowledgeAsync(any(Message.class));
-//            assertEquals(0, consumerInstance.getNumMessages());
-//
-//            // complete all the publishes
-//            synchronized (producerInstance) {
-//                for (CompletableFuture<MessageId> future : producerInstance.sendFutures) {
-//                    future.complete(mock(MessageId.class));
-//                }
-//            }
-//
-//            // acknowledges count should remain same
-//            verify(consumerInstance.getConsumer(), times(10))
-//                    .acknowledgeAsync(any(Message.class));
-//        }
-//    }
-//
-//    @Test
-//    public void testAtMostOnceProcessingFailures() throws Exception {
-//        FunctionDetails newFunctionDetails = FunctionDetails.newBuilder(functionDetails)
-//                .setProcessingGuarantees(ProcessingGuarantees.ATMOST_ONCE)
-//                .setClassName(TestFailureFunction.class.getName())
-//                .build();
-//        config.setFunctionDetails(newFunctionDetails);
-//
-//        @Cleanup("shutdown")
-//        ExecutorService executorService = Executors.newSingleThreadExecutor();
-//
-//        try (JavaInstanceRunnable runnable = new JavaInstanceRunnable(
-//                config,
-//                fnCache,
-//                "test-jar-file",
-//                mockClient,
-//                null)) {
-//
-//            executorService.submit(runnable);
-//
-//            Pair<String, String> consumerId = Pair.of(
-//                    newFunctionDetails.getInputs(0),
-//                    FunctionDetailsUtils.getFullyQualifiedName(newFunctionDetails));
-//            ConsumerInstance consumerInstance = mockConsumers.get(consumerId);
-//            while (null == consumerInstance) {
-//                TimeUnit.MILLISECONDS.sleep(20);
-//                consumerInstance = mockConsumers.get(consumerId);
-//            }
-//
-//            ProducerInstance producerInstance = mockProducers.values().iterator().next();
-//
-//            // once we get consumer id, simulate receiving 10 messages from consumer
-//            for (int i = 0; i < 10; i++) {
-//                Message msg = mock(Message.class);
-//                when(msg.getData()).thenReturn(("message-" + i).getBytes(UTF_8));
-//                when(msg.getMessageId())
-//                        .thenReturn(new MessageIdImpl(1L, i, 0));
-//                consumerInstance.addMessage(msg);
-//                consumerInstance.getConf().getMessageListener()
-//                        .received(consumerInstance.getConsumer(), msg);
-//            }
-//
-//            // wait until all the messages are published
-//            for (int i = 0; i < 10; i++) {
-//                if (i % 2 == 0) { // all messages (i % 2 == 0) will fail to process.
-//                    continue;
-//                }
-//
-//                Message msg = producerInstance.msgQueue.take();
-//
-//                assertEquals("message-" + i + "!", new String(msg.getData(), UTF_8));
-//                // sequence id is not set for AT_MOST_ONCE processing
-//                assertEquals(0L, msg.getSequenceId());
-//            }
-//
-//            // verify acknowledge before send completes
-//            verify(consumerInstance.getConsumer(), times(10))
-//                    .acknowledgeAsync(any(Message.class));
-//            assertEquals(0, consumerInstance.getNumMessages());
-//
-//            // complete all the publishes
-//            synchronized (producerInstance) {
-//                for (CompletableFuture<MessageId> future : producerInstance.sendFutures) {
-//                    future.complete(mock(MessageId.class));
-//                }
-//            }
-//
-//            // acknowledges count should remain same
-//            verify(consumerInstance.getConsumer(), times(10))
-//                    .acknowledgeAsync(any(Message.class));
-//            assertEquals(0, consumerInstance.getNumMessages());
-//        }
-//    }
-//
-//    @Test
-//    public void testAtLeastOnceProcessing() throws Exception {
-//        FunctionDetails newFunctionDetails = FunctionDetails.newBuilder(functionDetails)
-//                .setProcessingGuarantees(ProcessingGuarantees.ATLEAST_ONCE)
-//                .build();
-//        config.setFunctionDetails(newFunctionDetails);
-//
-//        @Cleanup("shutdown")
-//        ExecutorService executorService = Executors.newSingleThreadExecutor();
-//
-//        try (JavaInstanceRunnable runnable = new JavaInstanceRunnable(
-//                config,
-//                fnCache,
-//                "test-jar-file",
-//                mockClient,
-//                null)) {
-//
-//            executorService.submit(runnable);
-//
-//            Pair<String, String> consumerId = Pair.of(
-//                    newFunctionDetails.getInputs(0),
-//                    FunctionDetailsUtils.getFullyQualifiedName(newFunctionDetails));
-//            ConsumerInstance consumerInstance = mockConsumers.get(consumerId);
-//            while (null == consumerInstance) {
-//                TimeUnit.MILLISECONDS.sleep(20);
-//                consumerInstance = mockConsumers.get(consumerId);
-//            }
-//
-//            ProducerInstance producerInstance = mockProducers.values().iterator().next();
-//
-//            // once we get consumer id, simulate receiving 10 messages from consumer
-//            for (int i = 0; i < 10; i++) {
-//                Message msg = mock(Message.class);
-//                when(msg.getData()).thenReturn(("message-" + i).getBytes(UTF_8));
-//                when(msg.getMessageId())
-//                        .thenReturn(new MessageIdImpl(1L, i, 0));
-//                consumerInstance.addMessage(msg);
-//                consumerInstance.getConf().getMessageListener()
-//                        .received(consumerInstance.getConsumer(), msg);
-//            }
-//
-//            // wait until all the messages are published
-//            for (int i = 0; i < 10; i++) {
-//                Message msg = producerInstance.msgQueue.take();
-//
-//                assertEquals("message-" + i + "!", new String(msg.getData(), UTF_8));
-//                // sequence id is not set for AT_MOST_ONCE processing
-//                assertEquals(0L, msg.getSequenceId());
-//            }
-//
-//            // verify acknowledge before send completes
-//            verify(consumerInstance.getConsumer(), times(0))
-//                    .acknowledgeAsync(any(Message.class));
-//            assertEquals(10, consumerInstance.getNumMessages());
-//
-//            // complete all the publishes
-//            synchronized (producerInstance) {
-//                for (CompletableFuture<MessageId> future : producerInstance.sendFutures) {
-//                    future.complete(mock(MessageId.class));
-//                }
-//            }
-//
-//            // acknowledges count should remain same
-//            verify(consumerInstance.getConsumer(), times(10))
-//                    .acknowledgeAsync(any(Message.class));
-//            assertEquals(0, consumerInstance.getNumMessages());
-//        }
-//    }
-//
-//    @Test
-//    public void testAtLeastOnceProcessingFailures() throws Exception {
-//        FunctionDetails newFunctionDetails = FunctionDetails.newBuilder(functionDetails)
-//                .setProcessingGuarantees(ProcessingGuarantees.ATLEAST_ONCE)
-//                .setClassName(TestFailureFunction.class.getName())
-//                .build();
-//        config.setFunctionDetails(newFunctionDetails);
-//
-//        @Cleanup("shutdown")
-//        ExecutorService executorService = Executors.newSingleThreadExecutor();
-//
-//        try (JavaInstanceRunnable runnable = new JavaInstanceRunnable(
-//                config,
-//                fnCache,
-//                "test-jar-file",
-//                mockClient,
-//                null)) {
-//
-//            executorService.submit(runnable);
-//
-//            Pair<String, String> consumerId = Pair.of(
-//                    newFunctionDetails.getInputs(0),
-//                    FunctionDetailsUtils.getFullyQualifiedName(newFunctionDetails));
-//            ConsumerInstance consumerInstance = mockConsumers.get(consumerId);
-//            while (null == consumerInstance) {
-//                TimeUnit.MILLISECONDS.sleep(20);
-//                consumerInstance = mockConsumers.get(consumerId);
-//            }
-//
-//            ProducerInstance producerInstance = mockProducers.values().iterator().next();
-//
-//            // once we get consumer id, simulate receiving 10 messages from consumer
-//            for (int i = 0; i < 10; i++) {
-//                Message msg = mock(Message.class);
-//                when(msg.getData()).thenReturn(("message-" + i).getBytes(UTF_8));
-//                when(msg.getMessageId())
-//                        .thenReturn(new MessageIdImpl(1L, i, 0));
-//                consumerInstance.addMessage(msg);
-//                consumerInstance.getConf().getMessageListener()
-//                        .received(consumerInstance.getConsumer(), msg);
-//            }
-//
-//            // wait until all the messages are published
-//            for (int i = 0; i < 10; i++) {
-//                if (i % 2 == 0) { // all messages (i % 2 == 0) will fail to process.
-//                    continue;
-//                }
-//
-//                Message msg = producerInstance.msgQueue.take();
-//
-//                assertEquals("message-" + i + "!", new String(msg.getData(), UTF_8));
-//                // sequence id is not set for AT_MOST_ONCE processing
-//                assertEquals(0L, msg.getSequenceId());
-//            }
-//
-//            // verify acknowledge before send completes
-//            verify(consumerInstance.getConsumer(), times(0))
-//                    .acknowledgeAsync(any(Message.class));
-//            assertEquals(10, consumerInstance.getNumMessages());
-//
-//            // complete all the publishes
-//            synchronized (producerInstance) {
-//                for (CompletableFuture<MessageId> future : producerInstance.sendFutures) {
-//                    future.complete(mock(MessageId.class));
-//                }
-//            }
-//
-//            // only 5 succeed messages are acknowledged
-//            verify(consumerInstance.getConsumer(), times(5))
-//                    .acknowledgeAsync(any(Message.class));
-//            assertEquals(5, consumerInstance.getNumMessages());
-//            for (int i = 0; i < 10; i++) {
-//                assertEquals(
-//                        i % 2 == 0,
-//                        consumerInstance.containMessage(new MessageIdImpl(1L, i, 0)));
-//            }
-//        }
-//    }
-//
-//    @Test
-//    public void testEffectivelyOnceProcessing() throws Exception {
-//        FunctionDetails newFunctionDetails = FunctionDetails.newBuilder(functionDetails)
-//                .setProcessingGuarantees(ProcessingGuarantees.EFFECTIVELY_ONCE)
-//                .build();
-//        config.setFunctionDetails(newFunctionDetails);
-//
-//        @Cleanup("shutdown")
-//        ExecutorService executorService = Executors.newSingleThreadExecutor();
-//
-//        try (JavaInstanceRunnable runnable = new JavaInstanceRunnable(
-//                config,
-//                fnCache,
-//                "test-jar-file",
-//                mockClient,
-//                null)) {
-//
-//            executorService.submit(runnable);
-//
-//            Pair<String, String> consumerId = Pair.of(
-//                    newFunctionDetails.getInputs(0),
-//                    FunctionDetailsUtils.getFullyQualifiedName(newFunctionDetails));
-//            ConsumerInstance consumerInstance = mockConsumers.get(consumerId);
-//            while (null == consumerInstance) {
-//                TimeUnit.MILLISECONDS.sleep(20);
-//                consumerInstance = mockConsumers.get(consumerId);
-//            }
-//
-//            // once we get consumer id, simulate receiving 10 messages from consumer
-//            for (int i = 0; i < 10; i++) {
-//                Message msg = mock(Message.class);
-//                when(msg.getData()).thenReturn(("message-" + i).getBytes(UTF_8));
-//                when(msg.getMessageId())
-//                        .thenReturn(new MessageIdImpl(1L, i, 0));
-//                consumerInstance.addMessage(msg);
-//                consumerInstance.getConf().getMessageListener()
-//                        .received(consumerInstance.getConsumer(), msg);
-//            }
-//
-//            ProducerInstance producerInstance;
-//            while (mockProducers.isEmpty()) {
-//                TimeUnit.MILLISECONDS.sleep(20);
-//            }
-//            producerInstance = mockProducers.values().iterator().next();
-//
-//            // wait until all the messages are published
-//            for (int i = 0; i < 10; i++) {
-//                Message msg = producerInstance.msgQueue.take();
-//
-//                assertEquals("message-" + i + "!", new String(msg.getData(), UTF_8));
-//                // sequence id is not set for AT_MOST_ONCE processing
-//                assertEquals(
-//                        Utils.getSequenceId(
-//                                new MessageIdImpl(1L, i, 0)),
-//                        msg.getSequenceId());
-//            }
-//
-//            // verify acknowledge before send completes
-//            verify(consumerInstance.getConsumer(), times(0))
-//                    .acknowledgeCumulativeAsync(any(Message.class));
-//            assertEquals(10, consumerInstance.getNumMessages());
-//
-//            // complete all the publishes
-//            synchronized (producerInstance) {
-//                for (CompletableFuture<MessageId> future : producerInstance.sendFutures) {
-//                    future.complete(mock(MessageId.class));
-//                }
-//            }
-//
-//            // acknowledges count should remain same
-//            verify(consumerInstance.getConsumer(), times(10))
-//                    .acknowledgeCumulativeAsync(any(Message.class));
-//            assertEquals(0, consumerInstance.getNumMessages());
-//        }
-//    }
-//
-//    @Test
-//    public void testEffectivelyOnceProcessingFailures() throws Exception {
-//        FunctionDetails newFunctionDetails = FunctionDetails.newBuilder(functionDetails)
-//                .setProcessingGuarantees(ProcessingGuarantees.EFFECTIVELY_ONCE)
-//                .setClassName(TestFailureFunction.class.getName())
-//                .build();
-//        config.setFunctionDetails(newFunctionDetails);
-//
-//        @Cleanup("shutdown")
-//        ExecutorService executorService = Executors.newSingleThreadExecutor();
-//
-//        try (JavaInstanceRunnable runnable = new JavaInstanceRunnable(
-//                config,
-//                fnCache,
-//                "test-jar-file",
-//                mockClient,
-//                null)) {
-//
-//            executorService.submit(runnable);
-//
-//            Pair<String, String> consumerId = Pair.of(
-//                    newFunctionDetails.getInputs(0),
-//                    FunctionDetailsUtils.getFullyQualifiedName(newFunctionDetails));
-//            ConsumerInstance consumerInstance = mockConsumers.get(consumerId);
-//            while (null == consumerInstance) {
-//                TimeUnit.MILLISECONDS.sleep(20);
-//                consumerInstance = mockConsumers.get(consumerId);
-//            }
-//
-//            // once we get consumer id, simulate receiving 2 messages from consumer
-//            Message[] msgs = new Message[2];
-//            for (int i = 1; i <= 2; i++) {
-//                Message msg = mock(Message.class);
-//                when(msg.getData()).thenReturn(("message-" + i).getBytes(UTF_8));
-//                when(msg.getMessageId())
-//                        .thenReturn(new MessageIdImpl(1L, i, 0));
-//
-//                msgs[i-1] = msg;
-//
-//                consumerInstance.addMessage(msg);
-//                consumerInstance.getConf().getMessageListener()
-//                        .received(consumerInstance.getConsumer(), msg);
-//            }
-//
-//            ProducerInstance producerInstance;
-//            while (mockProducers.isEmpty()) {
-//                TimeUnit.MILLISECONDS.sleep(20);
-//            }
-//            producerInstance = mockProducers.values().iterator().next();
-//
-//            // only first message is published, the second message is not
-//            Message msg = producerInstance.msgQueue.take();
-//            assertEquals("message-1!", new String(msg.getData(), UTF_8));
-//            assertEquals(
-//                    Utils.getSequenceId(
-//                            new MessageIdImpl(1L, 1, 0)),
-//                    msg.getSequenceId());
-//            assertNull(producerInstance.msgQueue.poll());
-//
-//            // the first result message is sent but the send future is not completed yet
-//            // so no acknowledge would happen
-//            verify(consumerInstance.getConsumer(), times(0))
-//                    .acknowledgeCumulativeAsync(any(Message.class));
-//
-//            // since the second message failed to process, for correctness, the instance
-//            // will close the existing consumer and resubscribe
-//            ConsumerInstance secondInstance = mockConsumers.get(consumerId);
-//            while (null == secondInstance || secondInstance == consumerInstance) {
-//                TimeUnit.MILLISECONDS.sleep(20);
-//                secondInstance = mockConsumers.get(consumerId);
-//            }
-//
-//            Message secondMsg = mock(Message.class);
-//            when(secondMsg.getData()).thenReturn("message-2".getBytes(UTF_8));
-//            when(secondMsg.getMessageId())
-//                    .thenReturn(new MessageIdImpl(1L, 2, 0));
-//            secondInstance.addMessage(secondMsg);
-//            secondInstance.getConf().getMessageListener()
-//                    .received(secondInstance.getConsumer(), secondMsg);
-//
-//            Message secondReceivedMsg = producerInstance.msgQueue.take();
-//            assertEquals("message-2!", new String(secondReceivedMsg.getData(), UTF_8));
-//            assertEquals(
-//                    Utils.getSequenceId(
-//                            new MessageIdImpl(1L, 2, 0)),
-//                    secondReceivedMsg.getSequenceId());
-//
-//            // the first result message is sent
-//            verify(secondInstance.getConsumer(), times(0))
-//                    .acknowledgeCumulativeAsync(any(Message.class));
-//
-//            // complete all the publishes
-//            synchronized (producerInstance) {
-//                assertEquals(2, producerInstance.sendFutures.size());
-//                for (CompletableFuture<MessageId> future : producerInstance.sendFutures) {
-//                    future.complete(mock(MessageId.class));
-//                }
-//            }
-//
-//            // all 2 messages are sent
-//            verify(consumerInstance.getConsumer(), times(1))
-//                    .acknowledgeCumulativeAsync(same(msgs[0]));
-//            verify(consumerInstance.getConsumer(), times(0))
-//                    .acknowledgeCumulativeAsync(same(msgs[1]));
-//            verify(consumerInstance.getConsumer(), times(0))
-//                    .acknowledgeCumulativeAsync(same(secondMsg));
-//            verify(secondInstance.getConsumer(), times(0))
-//                    .acknowledgeCumulativeAsync(same(msgs[0]));
-//            verify(secondInstance.getConsumer(), times(0))
-//                    .acknowledgeCumulativeAsync(same(msgs[1]));
-//        }
-//    }
-//
-//    @Test
-//    public void testVoidFunction() throws Exception {
-//        FunctionDetails newFunctionDetails = FunctionDetails.newBuilder(functionDetails)
-//                .setProcessingGuarantees(ProcessingGuarantees.ATLEAST_ONCE)
-//                .setClassName(TestVoidFunction.class.getName())
-//                .build();
-//        config.setFunctionDetails(newFunctionDetails);
-//
-//        @Cleanup("shutdown")
-//        ExecutorService executorService = Executors.newSingleThreadExecutor();
-//
-//        try (JavaInstanceRunnable runnable = new JavaInstanceRunnable(
-//                config,
-//                fnCache,
-//                "test-jar-file",
-//                mockClient,
-//                null)) {
-//
-//            executorService.submit(runnable);
-//
-//            Pair<String, String> consumerId = Pair.of(
-//                    newFunctionDetails.getInputs(0),
-//                    FunctionDetailsUtils.getFullyQualifiedName(newFunctionDetails));
-//            ConsumerInstance consumerInstance = mockConsumers.get(consumerId);
-//            while (null == consumerInstance) {
-//                TimeUnit.MILLISECONDS.sleep(20);
-//                consumerInstance = mockConsumers.get(consumerId);
-//            }
-//
-//            // once we get consumer id, simulate receiving 10 messages from consumer
-//            for (int i = 0; i < 10; i++) {
-//                Message msg = mock(Message.class);
-//                when(msg.getData()).thenReturn(("message-" + i).getBytes(UTF_8));
-//                when(msg.getMessageId())
-//                        .thenReturn(new MessageIdImpl(1L, i, 0));
-//                consumerInstance.addMessage(msg);
-//                consumerInstance.getConf().getMessageListener()
-//                        .received(consumerInstance.getConsumer(), msg);
-//            }
-//
-//            // wait until all the messages are published
-//            for (int i = 0; i < 10; i++) {
-//                String msg = voidFunctionQueue.take();
-//                log.info("Processed message {}", msg);
-//                assertEquals("message-" + i, msg);
-//            }
-//
-//            // no producer should be initialized
-//            assertTrue(mockProducers.isEmpty());
-//        }
-//    }
-//}
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java
index 2805a3d..04d0482 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java
@@ -28,9 +28,9 @@ import org.testng.annotations.Test;
 import java.nio.ByteBuffer;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index 1ef72c7..700267e 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -18,9 +18,30 @@
  */
 package org.apache.pulsar.functions.sink;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.MessageId;
@@ -38,31 +59,10 @@ import org.apache.pulsar.functions.instance.SinkRecord;
 import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
 import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.io.core.SinkContext;
-import org.mockito.ArgumentMatcher;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyList;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.testng.AssertJUnit.assertEquals;
-import static org.testng.AssertJUnit.assertTrue;
-import static org.testng.AssertJUnit.fail;
-
 @Slf4j
 public class PulsarSinkTest {
 
@@ -109,7 +109,7 @@ public class PulsarSinkTest {
         doReturn(producerBuilder).when(producerBuilder).property(anyString(), anyString());
         doReturn(producerBuilder).when(producerBuilder).properties(any());
         doReturn(producerBuilder).when(producerBuilder).sendTimeout(anyInt(), any());
-        
+
         CompletableFuture completableFuture = new CompletableFuture<>();
         completableFuture.complete(mock(MessageId.class));
         TypedMessageBuilder typedMessageBuilder = mock(TypedMessageBuilder.class);
@@ -297,14 +297,11 @@ public class PulsarSinkTest {
             } else {
                 Assert.assertTrue(pulsarSinkAtLeastOnceProcessor.publishProducers.containsKey(defaultTopic));
             }
-            verify(pulsarClient.newProducer(), times(1)).topic(argThat(new ArgumentMatcher<String>() {
-
-                @Override
-                public boolean matches(Object o) {
-                    if (o instanceof String) {
-                        return getTopicEquals(o, topic, defaultTopic);
-                    }
-                    return false;
+            verify(pulsarClient.newProducer(), times(1)).topic(argThat(otherTopic -> {
+                if (topic != null) {
+                    return topic.equals(otherTopic);
+                } else {
+                    return defaultTopic.equals(otherTopic);
                 }
             }));
         }
@@ -346,15 +343,8 @@ public class PulsarSinkTest {
             } else {
                 Assert.assertTrue(pulsarSinkAtLeastOnceProcessor.publishProducers.containsKey(defaultTopic));
             }
-            verify(pulsarClient.newProducer(), times(1)).topic(argThat(new ArgumentMatcher<String>() {
-
-                @Override
-                public boolean matches(Object o) {
-                    if (o instanceof String) {
-                        return getTopicEquals(o, topic, defaultTopic);
-                    }
-                    return false;
-                }
+            verify(pulsarClient.newProducer(), times(1)).topic(argThat(o -> {
+                return getTopicEquals(o, topic, defaultTopic);
             }));
         }
 
@@ -408,28 +398,15 @@ public class PulsarSinkTest {
             } else {
                 Assert.assertTrue(pulsarSinkEffectivelyOnceProcessor.publishProducers.containsKey(String.format("%s-%s-id-1", defaultTopic, defaultTopic)));
             }
-            verify(pulsarClient.newProducer(), times(1)).topic(argThat(new ArgumentMatcher<String>() {
 
-                @Override
-                public boolean matches(Object o) {
-                    if (o instanceof String) {
-                        return getTopicEquals(o, topic, defaultTopic);
-                    }
-                    return false;
-                }
+            verify(pulsarClient.newProducer(), times(1)).topic(argThat(o -> {
+                return getTopicEquals(o, topic, defaultTopic);
             }));
-            verify(pulsarClient.newProducer(), times(1)).producerName(argThat(new ArgumentMatcher<String>() {
-
-                @Override
-                public boolean matches(Object o) {
-                    if (o instanceof String) {
-                        if (topic != null) {
-                            return String.format("%s-id-1", topic).equals(o);
-                        } else {
-                            return String.format("%s-id-1", defaultTopic).equals(o);
-                        }
-                    }
-                    return false;
+            verify(pulsarClient.newProducer(), times(1)).producerName(argThat(o -> {
+                if (topic != null) {
+                    return String.format("%s-id-1", topic).equals(o);
+                } else {
+                    return String.format("%s-id-1", defaultTopic).equals(o);
                 }
             }));
         }
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index 43b4bcb..4438bf0 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -18,10 +18,11 @@
  */
 package org.apache.pulsar.functions.source;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyList;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyString;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.testng.AssertJUnit.assertEquals;
@@ -29,12 +30,12 @@ import static org.testng.AssertJUnit.assertFalse;
 import static org.testng.AssertJUnit.assertTrue;
 import static org.testng.AssertJUnit.fail;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
+import lombok.Cleanup;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
@@ -43,9 +44,9 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.io.core.SourceContext;
 import org.testng.annotations.Test;
 
@@ -76,14 +77,14 @@ public class PulsarSourceTest {
      */
     private static PulsarClientImpl getPulsarClient() throws PulsarClientException {
         PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
-        ConsumerBuilder consumerBuilder = mock(ConsumerBuilder.class);
+        ConsumerBuilder<?> consumerBuilder = mock(ConsumerBuilder.class);
         doReturn(consumerBuilder).when(consumerBuilder).topics(anyList());
         doReturn(consumerBuilder).when(consumerBuilder).cryptoFailureAction(any());
-        doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(anyString());
+        doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(any());
         doReturn(consumerBuilder).when(consumerBuilder).subscriptionType(any());
         doReturn(consumerBuilder).when(consumerBuilder).ackTimeout(anyLong(), any());
         doReturn(consumerBuilder).when(consumerBuilder).messageListener(any());
-        Consumer consumer = mock(Consumer.class);
+        Consumer<?> consumer = mock(Consumer.class);
         doReturn(consumer).when(consumerBuilder).subscribe();
         doReturn(consumerBuilder).when(pulsarClient).newConsumer(any());
         doReturn(CompletableFuture.completedFuture(consumer)).when(consumerBuilder).subscribeAsync();
@@ -120,11 +121,13 @@ public class PulsarSourceTest {
 
 
     @Test
-    public void testVoidInputClasses() throws IOException {
+    public void testVoidInputClasses() throws Exception {
         PulsarSourceConfig pulsarConfig = getPulsarConfigs();
         // set type to void
         pulsarConfig.setTypeClassName(Void.class.getName());
-        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>());
+
+        @Cleanup
+        PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>());
 
         try {
             pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
@@ -142,7 +145,7 @@ public class PulsarSourceTest {
      * Verify that function input type should be consistent with input serde type.
      */
     @Test
-    public void testInconsistentInputType() throws IOException {
+    public void testInconsistentInputType() throws Exception {
         PulsarSourceConfig pulsarConfig = getPulsarConfigs();
         // set type to be inconsistent to that of SerDe
         pulsarConfig.setTypeClassName(Integer.class.getName());
@@ -150,7 +153,9 @@ public class PulsarSourceTest {
         topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result",
                 ConsumerConfig.builder().serdeClassName(TestSerDe.class.getName()).build());
         pulsarConfig.setTopicSchema(topicSerdeClassNameMap);
-        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>());
+
+        @Cleanup
+        PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>());
         try {
             pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
             fail("Should fail constructing java instance if function type is inconsistent with serde type");
@@ -175,7 +180,9 @@ public class PulsarSourceTest {
         consumerConfigs.put("persistent://sample/standalone/ns1/test_result",
                 ConsumerConfig.builder().serdeClassName(TopicSchema.DEFAULT_SERDE).build());
         pulsarConfig.setTopicSchema(consumerConfigs);
-        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>());
+
+        @Cleanup
+        PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>());
 
         pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
     }
@@ -188,7 +195,9 @@ public class PulsarSourceTest {
         consumerConfigs.put("persistent://sample/standalone/ns1/test_result",
                 ConsumerConfig.builder().serdeClassName(ComplexSerDe.class.getName()).build());
         pulsarConfig.setTopicSchema(consumerConfigs);
-        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>());
+
+        @Cleanup
+        PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>());
 
         pulsarSource.setupConsumerConfigs();
     }
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
index ac96b44..67e9ccc 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
@@ -40,8 +40,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.*;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.fail;
 import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
@@ -49,7 +48,6 @@ import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
 /**
  * Unit tests for {@link WindowFunctionExecutor}
  */
-@Slf4j
 public class WindowFunctionExecutorTest {
 
     private static class TestWindowFunctionExecutor extends WindowFunctionExecutor<Long, Long> {
@@ -206,22 +204,22 @@ public class WindowFunctionExecutorTest {
     public void testExecuteWithLateTupleStream() throws Exception {
 
         windowConfig.setLateDataTopic("$late");
-        Mockito.doReturn(Optional.of(new Gson().fromJson(new Gson().toJson(windowConfig), Map.class)))
+        doReturn(Optional.of(new Gson().fromJson(new Gson().toJson(windowConfig), Map.class)))
                 .when(context).getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY);
-        TypedMessageBuilder typedMessageBuilder = Mockito.mock(TypedMessageBuilder.class);
-        Mockito.when(typedMessageBuilder.value(anyString())).thenReturn(typedMessageBuilder);
-        Mockito.when(typedMessageBuilder.sendAsync()).thenReturn(CompletableFuture.anyOf());
-        Mockito.when(context.newOutputMessage(anyString(), anyObject())).thenReturn(typedMessageBuilder);
+        TypedMessageBuilder typedMessageBuilder = mock(TypedMessageBuilder.class);
+        when(typedMessageBuilder.value(any())).thenReturn(typedMessageBuilder);
+        when(typedMessageBuilder.sendAsync()).thenReturn(CompletableFuture.anyOf());
+        when(context.newOutputMessage(anyString(), any())).thenReturn(typedMessageBuilder);
 
         long[] timestamps = {603, 605, 607, 618, 626, 636, 600};
         List<Long> events = new ArrayList<>(timestamps.length);
 
         for (long ts : timestamps) {
             events.add(ts);
-            Record<?> record = Mockito.mock(Record.class);
-            Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
-            Mockito.doReturn(record).when(context).getCurrentRecord();
-            Mockito.doReturn(ts).when(record).getValue();
+            Record<?> record = mock(Record.class);
+            doReturn(Optional.of("test-topic")).when(record).getTopicName();
+            doReturn(record).when(context).getCurrentRecord();
+            doReturn(ts).when(record).getValue();
             testWindowedPulsarFunction.process(ts, context);
 
             //Update the watermark to this timestamp
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java
index d31767a..4c63a4f 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java
@@ -18,16 +18,23 @@
  */
 package org.apache.pulsar.functions.auth;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
 import io.kubernetes.client.ApiException;
 import io.kubernetes.client.apis.CoreV1Api;
 import io.kubernetes.client.models.V1Container;
 import io.kubernetes.client.models.V1PodSpec;
 import io.kubernetes.client.models.V1PodTemplateSpec;
 import io.kubernetes.client.models.V1Secret;
-import io.kubernetes.client.models.V1ServiceAccount;
 import io.kubernetes.client.models.V1StatefulSet;
 import io.kubernetes.client.models.V1StatefulSetSpec;
-import lombok.extern.slf4j.Slf4j;
+
+import java.util.Collections;
+import java.util.Optional;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
@@ -35,16 +42,7 @@ import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import java.util.Collections;
-import java.util.Optional;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
 
-@Slf4j
 public class KubernetesSecretsTokenAuthProviderTest {
 
     @Test
diff --git a/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProviderTest.java b/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProviderTest.java
index 3392e9a..3ebb526 100644
--- a/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProviderTest.java
+++ b/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProviderTest.java
@@ -19,12 +19,12 @@
 
 package org.apache.pulsar.functions.secretsprovider;
 
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
 import java.lang.reflect.Field;
 import java.util.Map;
 
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
 /**
  * Unit test of {@link Exceptions}.
  */
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java
index 309e466..f728f67 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java
@@ -23,7 +23,7 @@ import org.testng.annotations.Test;
 
 import java.util.function.Supplier;
 
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java
index 5d2fc3f..763465b 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
index c93569b..ca21c8b 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
@@ -18,8 +18,7 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index 75e83bf..f16bd3f 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -18,9 +18,9 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.argThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -30,7 +30,6 @@ import static org.mockito.Mockito.when;
 import java.util.HashMap;
 import java.util.Map;
 
-import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
@@ -43,7 +42,6 @@ import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-@Slf4j
 public class FunctionMetaDataManagerTest {
 
     private static PulsarClient mockPulsarClient() throws PulsarClientException {
@@ -113,25 +111,20 @@ public class FunctionMetaDataManagerTest {
         verify(functionMetaDataManager, times(1)).submit(any(Request.ServiceRequest.class));
         verify(functionMetaDataManager).submit(argThat(new ArgumentMatcher<Request.ServiceRequest>() {
             @Override
-            public boolean matches(Object o) {
-                if (o instanceof Request.ServiceRequest) {
-                    Request.ServiceRequest serviceRequest = (Request.ServiceRequest) o;
-                    if (!serviceRequest.getWorkerId().equals(workerConfig.getWorkerId())) {
-                        return false;
-                    }
-                    if (!serviceRequest.getServiceRequestType().equals(Request.ServiceRequest.ServiceRequestType
-                            .UPDATE)) {
-                        return false;
-                    }
-                    if (!serviceRequest.getFunctionMetaData().equals(m1)) {
-                        return false;
-                    }
-                    if (serviceRequest.getFunctionMetaData().getVersion() != 0) {
-                        return false;
-                    }
-                    return true;
+            public boolean matches(Request.ServiceRequest serviceRequest) {
+                if (!serviceRequest.getWorkerId().equals(workerConfig.getWorkerId())) {
+                    return false;
                 }
-                return false;
+                if (!serviceRequest.getServiceRequestType().equals(Request.ServiceRequest.ServiceRequestType.UPDATE)) {
+                    return false;
+                }
+                if (!serviceRequest.getFunctionMetaData().equals(m1)) {
+                    return false;
+                }
+                if (serviceRequest.getFunctionMetaData().getVersion() != 0) {
+                    return false;
+                }
+                return true;
             }
         }));
 
@@ -154,23 +147,20 @@ public class FunctionMetaDataManagerTest {
         verify(functionMetaDataManager, times(1)).submit(any(Request.ServiceRequest.class));
         verify(functionMetaDataManager).submit(argThat(new ArgumentMatcher<Request.ServiceRequest>() {
             @Override
-            public boolean matches(Object o) {
-                if (o instanceof Request.ServiceRequest) {
-                    Request.ServiceRequest serviceRequest = (Request.ServiceRequest) o;
-                    if (!serviceRequest.getWorkerId().equals(workerConfig.getWorkerId())) return false;
-                    if (!serviceRequest.getServiceRequestType().equals(
-                            Request.ServiceRequest.ServiceRequestType.UPDATE)) {
-                        return false;
-                    }
-                    if (!serviceRequest.getFunctionMetaData().getFunctionDetails().equals(m2.getFunctionDetails())) {
-                        return false;
-                    }
-                    if (serviceRequest.getFunctionMetaData().getVersion() != (version + 1)) {
-                        return false;
-                    }
-                    return true;
+            public boolean matches(Request.ServiceRequest serviceRequest) {
+                if (!serviceRequest.getWorkerId().equals(workerConfig.getWorkerId()))
+                    return false;
+                if (!serviceRequest.getServiceRequestType().equals(
+                        Request.ServiceRequest.ServiceRequestType.UPDATE)) {
+                    return false;
                 }
-                return false;
+                if (!serviceRequest.getFunctionMetaData().getFunctionDetails().equals(m2.getFunctionDetails())) {
+                    return false;
+                }
+                if (serviceRequest.getFunctionMetaData().getVersion() != (version + 1)) {
+                    return false;
+                }
+                return true;
             }
         }));
 
@@ -205,36 +195,30 @@ public class FunctionMetaDataManagerTest {
         functionMetaDataManager.changeFunctionInstanceStatus("tenant-1", "namespace-1", "func-1", 0, false);
 
         verify(functionMetaDataManager, times(1)).submit(any(Request.ServiceRequest.class));
-        verify(functionMetaDataManager).submit(argThat(new ArgumentMatcher<Request.ServiceRequest>() {
-            @Override
-            public boolean matches(Object o) {
-                if (o instanceof Request.ServiceRequest) {
-                    Request.ServiceRequest serviceRequest = (Request.ServiceRequest) o;
-                    if (!serviceRequest.getWorkerId().equals(workerConfig.getWorkerId())) return false;
-                    if (!serviceRequest.getServiceRequestType().equals(
-                            Request.ServiceRequest.ServiceRequestType.UPDATE)) {
-                        return false;
-                    }
-                    if (!serviceRequest.getFunctionMetaData().getFunctionDetails().equals(f1.getFunctionDetails())) {
-                        return false;
-                    }
-                    if (serviceRequest.getFunctionMetaData().getVersion() != (version + 1)) {
-                        return false;
-                    }
-                    Map<Integer, Function.FunctionState> stateMap = serviceRequest.getFunctionMetaData().getInstanceStatesMap();
-                    if (stateMap == null || stateMap.isEmpty()) {
-                        return false;
-                    }
-                    if (stateMap.get(1) != Function.FunctionState.RUNNING) {
-                        return false;
-                    }
-                    if (stateMap.get(0) != Function.FunctionState.STOPPED) {
-                        return false;
-                    }
-                    return true;
-                }
+        verify(functionMetaDataManager).submit(argThat(serviceRequest -> {
+            if (!serviceRequest.getWorkerId().equals(workerConfig.getWorkerId()))
+                return false;
+            if (!serviceRequest.getServiceRequestType().equals(
+                    Request.ServiceRequest.ServiceRequestType.UPDATE)) {
+                return false;
+            }
+            if (!serviceRequest.getFunctionMetaData().getFunctionDetails().equals(f1.getFunctionDetails())) {
+                return false;
+            }
+            if (serviceRequest.getFunctionMetaData().getVersion() != (version + 1)) {
+                return false;
+            }
+            Map<Integer, Function.FunctionState> stateMap = serviceRequest.getFunctionMetaData().getInstanceStatesMap();
+            if (stateMap == null || stateMap.isEmpty()) {
+                return false;
+            }
+            if (stateMap.get(1) != Function.FunctionState.RUNNING) {
+                return false;
+            }
+            if (stateMap.get(0) != Function.FunctionState.STOPPED) {
                 return false;
             }
+            return true;
         }));
     }
 
@@ -261,23 +245,20 @@ public class FunctionMetaDataManagerTest {
         verify(functionMetaDataManager, times(1)).submit(any(Request.ServiceRequest.class));
         verify(functionMetaDataManager).submit(argThat(new ArgumentMatcher<Request.ServiceRequest>() {
             @Override
-            public boolean matches(Object o) {
-                if (o instanceof Request.ServiceRequest) {
-                    Request.ServiceRequest serviceRequest = (Request.ServiceRequest) o;
-                    if (!serviceRequest.getWorkerId().equals(workerConfig.getWorkerId())) return false;
-                    if (!serviceRequest.getServiceRequestType().equals(
-                            Request.ServiceRequest.ServiceRequestType.DELETE)) {
-                        return false;
-                    }
-                    if (!serviceRequest.getFunctionMetaData().getFunctionDetails().equals(m1.getFunctionDetails())) {
-                        return false;
-                    }
-                    if (serviceRequest.getFunctionMetaData().getVersion() != (version + 1)) {
-                        return false;
-                    }
-                    return true;
+            public boolean matches(Request.ServiceRequest serviceRequest) {
+                if (!serviceRequest.getWorkerId().equals(workerConfig.getWorkerId()))
+                    return false;
+                if (!serviceRequest.getServiceRequestType().equals(
+                        Request.ServiceRequest.ServiceRequestType.DELETE)) {
+                    return false;
                 }
-                return false;
+                if (!serviceRequest.getFunctionMetaData().getFunctionDetails().equals(m1.getFunctionDetails())) {
+                    return false;
+                }
+                if (serviceRequest.getFunctionMetaData().getVersion() != (version + 1)) {
+                    return false;
+                }
+                return true;
             }
         }));
     }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailerTest.java
index fee6021..bad0fff 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailerTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -79,6 +79,6 @@ public class FunctionMetaDataTopicTailerTest {
         receiveFuture.thenApply(Function.identity()).get();
 
         verify(reader, times(2)).readNextAsync();
-        verify(fsm, times(1)).processRequest(any(MessageId.class), any(ServiceRequest.class));
+        verify(fsm, times(1)).processRequest(any(), any(ServiceRequest.class));
     }
 }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index b009567..399b355 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -44,11 +44,11 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.argThat;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.argThat;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
@@ -131,16 +131,11 @@ public class FunctionRuntimeManagerTest {
         verify(functionActioner, times(1)).startFunction(any(FunctionRuntimeInfo.class));
         verify(functionActioner).startFunction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() {
             @Override
-            public boolean matches(Object o) {
-                if (o instanceof FunctionRuntimeInfo) {
-                    FunctionRuntimeInfo functionRuntimeInfo = (FunctionRuntimeInfo) o;
-
-                    if (!functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function1)) {
-                        return false;
-                    }
-                    return true;
+            public boolean matches(FunctionRuntimeInfo functionRuntimeInfo) {
+                if (!functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function1)) {
+                    return false;
                 }
-                return false;
+                return true;
             }
         }));
         verify(functionActioner, times(0)).stopFunction(any(FunctionRuntimeInfo.class));
@@ -231,16 +226,11 @@ public class FunctionRuntimeManagerTest {
         verify(functionActioner, times(1)).terminateFunction(any(FunctionRuntimeInfo.class));
         verify(functionActioner).terminateFunction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() {
             @Override
-            public boolean matches(Object o) {
-                if (o instanceof FunctionRuntimeInfo) {
-                    FunctionRuntimeInfo functionRuntimeInfo = (FunctionRuntimeInfo) o;
-
-                    if (!functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function1)) {
-                        return false;
-                    }
-                    return true;
+            public boolean matches(FunctionRuntimeInfo functionRuntimeInfo) {
+                if (!functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function1)) {
+                    return false;
                 }
-                return false;
+                return true;
             }
         }));
 
@@ -329,32 +319,22 @@ public class FunctionRuntimeManagerTest {
 
         verify(functionActioner).stopFunction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() {
             @Override
-            public boolean matches(Object o) {
-                if (o instanceof FunctionRuntimeInfo) {
-                    FunctionRuntimeInfo functionRuntimeInfo = (FunctionRuntimeInfo) o;
-
-                    if (!functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function2)) {
-                        return false;
-                    }
-                    return true;
+            public boolean matches(FunctionRuntimeInfo functionRuntimeInfo) {
+                if (!functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function2)) {
+                    return false;
                 }
-                return false;
+                return true;
             }
         }));
 
         verify(functionActioner, times(1)).startFunction(any(FunctionRuntimeInfo.class));
         verify(functionActioner).startFunction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() {
             @Override
-            public boolean matches(Object o) {
-                if (o instanceof FunctionRuntimeInfo) {
-                    FunctionRuntimeInfo functionRuntimeInfo = (FunctionRuntimeInfo) o;
-
-                    if (!functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function2)) {
-                        return false;
-                    }
-                    return true;
+            public boolean matches(FunctionRuntimeInfo functionRuntimeInfo) {
+                if (!functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function2)) {
+                    return false;
                 }
-                return false;
+                return true;
             }
         }));
 
@@ -385,19 +365,11 @@ public class FunctionRuntimeManagerTest {
         // make sure terminate is not called since this is a update operation
         verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
 
-        verify(functionActioner).stopFunction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() {
-            @Override
-            public boolean matches(Object o) {
-                if (o instanceof FunctionRuntimeInfo) {
-                    FunctionRuntimeInfo functionRuntimeInfo = (FunctionRuntimeInfo) o;
-
-                    if (!functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function2)) {
-                        return false;
-                    }
-                    return true;
+        verify(functionActioner).stopFunction(argThat(functionRuntimeInfo -> {
+                if (!functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function2)) {
+                    return false;
                 }
-                return false;
-            }
+                return true;
         }));
 
         verify(functionActioner, times(0)).startFunction(any(FunctionRuntimeInfo.class));
@@ -630,33 +602,17 @@ public class FunctionRuntimeManagerTest {
         verify(functionActioner, times(1)).stopFunction(any(FunctionRuntimeInfo.class));
         verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
 
-        verify(functionActioner).startFunction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() {
-            @Override
-            public boolean matches(Object o) {
-                if (o instanceof FunctionRuntimeInfo) {
-                    FunctionRuntimeInfo functionRuntimeInfo = (FunctionRuntimeInfo) o;
-
-                    if (!functionRuntimeInfo.getFunctionInstance().equals(assignment1.getInstance())) {
-                        return false;
-                    }
-                    return true;
+        verify(functionActioner).startFunction(argThat(functionRuntimeInfo -> {
+                if (!functionRuntimeInfo.getFunctionInstance().equals(assignment1.getInstance())) {
+                    return false;
                 }
-                return false;
-            }
+                return true;
         }));
-        verify(functionActioner).stopFunction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() {
-            @Override
-            public boolean matches(Object o) {
-                if (o instanceof FunctionRuntimeInfo) {
-                    FunctionRuntimeInfo functionRuntimeInfo = (FunctionRuntimeInfo) o;
-
-                    if (functionRuntimeInfo.getRuntimeSpawner() != null) {
-                        return false;
-                    }
-                    return true;
+        verify(functionActioner).stopFunction(argThat(functionRuntimeInfo -> {
+                if (functionRuntimeInfo.getRuntimeSpawner() != null) {
+                    return false;
                 }
-                return false;
-            }
+                return true;
         }));
 
         Assert.assertEquals(functionRuntimeManager.functionRuntimeInfoMap.size(), 1);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index cfbfa47..494c93e 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -18,9 +18,9 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.argThat;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -30,7 +30,6 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
-import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
@@ -48,7 +47,6 @@ import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.functions.proto.Function;
-import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -85,7 +83,7 @@ public class MembershipManagerTest {
         AtomicReference<ConsumerEventListener> listenerHolder = new AtomicReference<>();
         when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenAnswer(invocationOnMock -> {
 
-            ConsumerEventListener listener = invocationOnMock.getArgumentAt(0, ConsumerEventListener.class);
+            ConsumerEventListener listener = invocationOnMock.getArgument(0);
             listenerHolder.set(listener);
 
             return mockConsumerBuilder;
@@ -270,20 +268,7 @@ public class MembershipManagerTest {
         membershipManager.checkFailures(functionMetaDataManager, functionRuntimeManager, schedulerManager);
 
         verify(functionRuntimeManager, times(1)).removeAssignments(
-                argThat(new ArgumentMatcher<Collection<Function.Assignment>>() {
-            @Override
-            public boolean matches(Object o) {
-                if (o instanceof Collection) {
-                    Collection<Function.Assignment> assignments = (Collection) o;
-
-                    if (!assignments.contains(assignment2)) {
-                        return false;
-                    }
-                    return true;
-                }
-                return false;
-            }
-        }));
+                argThat(assignments -> assignments.contains(assignment2)));
 
         verify(schedulerManager, times(1)).schedule();
     }
@@ -437,5 +422,5 @@ public class MembershipManagerTest {
         verify(functionRuntimeManager, times(0)).removeAssignments(any());
         Assert.assertEquals(membershipManager.unsignedFunctionDurations.size(), 0);
     }
-    
+
 }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 9f4fa2e..1511780 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -57,10 +57,10 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/dlog/DLInputStreamTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/dlog/DLInputStreamTest.java
index ab0adb4..5821e22 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/dlog/DLInputStreamTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/dlog/DLInputStreamTest.java
@@ -19,9 +19,9 @@
 package org.apache.pulsar.functions.worker.dlog;
 
 import static com.google.common.base.Charsets.UTF_8;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/dlog/DLOutputStreamTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/dlog/DLOutputStreamTest.java
index 5a3fa05..d7676eb 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/dlog/DLOutputStreamTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/dlog/DLOutputStreamTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.functions.worker.dlog;
 
-import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/executor/MockExecutorController.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/executor/MockExecutorController.java
index 151e4b1..a0f051f 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/executor/MockExecutorController.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/executor/MockExecutorController.java
@@ -19,8 +19,8 @@
 package org.apache.pulsar.functions.worker.executor;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.doAnswer;
 
 import com.google.common.collect.Lists;
@@ -142,10 +142,10 @@ public class MockExecutorController {
 
     private static Answer<ScheduledFuture<?>> answerAtFixedRate(MockExecutorController controller, int numTimes) {
         return invocationOnMock -> {
-            Runnable task = invocationOnMock.getArgumentAt(0, Runnable.class);
-            long initialDelay = invocationOnMock.getArgumentAt(1, long.class);
-            long delay = invocationOnMock.getArgumentAt(2, long.class);
-            TimeUnit unit = invocationOnMock.getArgumentAt(3, TimeUnit.class);
+            Runnable task = invocationOnMock.getArgument(0);
+            long initialDelay = invocationOnMock.getArgument(1);
+            long delay = invocationOnMock.getArgument(2);
+            TimeUnit unit = invocationOnMock.getArgument(3);
 
             DeferredTask deferredTask = null;
             for (int i = 0; i < numTimes; i++) {
@@ -163,9 +163,9 @@ public class MockExecutorController {
     private static Answer<ScheduledFuture<?>> answerDelay(MockExecutorController executor) {
         return invocationOnMock -> {
 
-           Runnable task = invocationOnMock.getArgumentAt(0, Runnable.class);
-           long value = invocationOnMock.getArgumentAt(1, long.class);
-           TimeUnit unit = invocationOnMock.getArgumentAt(2, TimeUnit.class);
+           Runnable task = invocationOnMock.getArgument(0);
+           long value = invocationOnMock.getArgument(1);
+           TimeUnit unit = invocationOnMock.getArgument(2);
            DeferredTask deferredTask = executor.addDelayedTask(executor, unit.toMillis(value), task);
            if (value <= 0) {
                task.run();
@@ -178,7 +178,7 @@ public class MockExecutorController {
     private static Answer<Future<?>> answerNow() {
         return invocationOnMock -> {
 
-           Runnable task = invocationOnMock.getArgumentAt(0, Runnable.class);
+           Runnable task = invocationOnMock.getArgument(0);
            task.run();
            SettableFuture<Void> future = SettableFuture.create();
            future.set(null);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/request/ServiceRequestManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/request/ServiceRequestManagerTest.java
index 91fa0e6..501ce1a 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/request/ServiceRequestManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/request/ServiceRequestManagerTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.functions.worker.request;
 
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index dcfa8f1..4e0c5fb 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -62,9 +62,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 62cba58..20988e0 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -18,11 +18,39 @@
  */
 package org.apache.pulsar.functions.worker.rest.api.v2;
 
+
+import static org.apache.pulsar.functions.utils.FunctionCommon.mergeJson;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.doNothing;
+import static org.powermock.api.mockito.PowerMockito.doThrow;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.testng.Assert.assertEquals;
+
 import com.google.common.collect.Lists;
 import com.google.gson.Gson;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.util.JsonFormat;
-import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.core.config.Configurator;
@@ -65,38 +93,11 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.StreamingOutput;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.pulsar.functions.utils.FunctionCommon.mergeJson;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.doNothing;
-import static org.powermock.api.mockito.PowerMockito.doThrow;
-import static org.powermock.api.mockito.PowerMockito.mockStatic;
-import static org.testng.Assert.assertEquals;
-
 /**
  * Unit test of {@link FunctionsApiV2Resource}.
  */
 @PrepareForTest({WorkerUtils.class, InstanceUtils.class})
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.functions.api.*" })
-@Slf4j
 public class FunctionApiV2ResourceTest {
 
     @ObjectFactory
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 6075d68..4bb08c1 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -18,8 +18,35 @@
  */
 package org.apache.pulsar.functions.worker.rest.api.v3;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.doNothing;
+import static org.powermock.api.mockito.PowerMockito.doThrow;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.testng.Assert.assertEquals;
+
 import com.google.common.collect.Lists;
-import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.core.config.Configurator;
@@ -62,38 +89,11 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.StreamingOutput;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.doNothing;
-import static org.powermock.api.mockito.PowerMockito.doThrow;
-import static org.powermock.api.mockito.PowerMockito.mockStatic;
-import static org.testng.Assert.assertEquals;
-
 /**
  * Unit test of {@link FunctionsApiV2Resource}.
  */
 @PrepareForTest({WorkerUtils.class, InstanceUtils.class})
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.functions.api.*" })
-@Slf4j
 public class FunctionApiV3ResourceTest {
 
     @ObjectFactory
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 41b3204..c47f9e5 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.functions.worker.rest.api.v3;
 
 import com.google.common.collect.Lists;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.core.config.Configurator;
@@ -65,7 +64,6 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URL;
-import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -74,9 +72,9 @@ import java.util.concurrent.CompletableFuture;
 
 import static org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.ATLEAST_ONCE;
 import static org.apache.pulsar.functions.source.TopicSchema.DEFAULT_SERDE;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
@@ -91,7 +89,7 @@ import static org.testng.Assert.assertEquals;
  */
 @PrepareForTest({WorkerUtils.class, SinkConfigUtils.class, ConnectorUtils.class, FunctionCommon.class, InstanceUtils.class})
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*", "java.io.*" })
-@Slf4j
+
 public class SinkApiV3ResourceTest {
 
     @ObjectFactory
@@ -819,7 +817,7 @@ public class SinkApiV3ResourceTest {
         FunctionCommon.getSinkType(anyString(), any(NarClassLoader.class));
 
         doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
-        FunctionCommon.extractNarClassLoader(any(Path.class), any(File.class));
+        FunctionCommon.extractNarClassLoader(any(), any());
 
         doReturn(ATLEAST_ONCE).when(FunctionCommon.class);
         FunctionCommon.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
@@ -890,7 +888,7 @@ public class SinkApiV3ResourceTest {
         FunctionCommon.getSinkType(anyString(), any(NarClassLoader.class));
 
         doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
-        FunctionCommon.extractNarClassLoader(any(Path.class), any(File.class));
+        FunctionCommon.extractNarClassLoader(any(), any());
 
         doReturn(ATLEAST_ONCE).when(FunctionCommon.class);
         FunctionCommon.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
@@ -988,7 +986,7 @@ public class SinkApiV3ResourceTest {
         PowerMockito.when(FunctionCommon.class, "extractFileFromPkgURL", any()).thenCallRealMethod();
 
         doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
-        FunctionCommon.extractNarClassLoader(any(Path.class), any(File.class));
+        FunctionCommon.extractNarClassLoader(any(), any());
 
         doReturn(ATLEAST_ONCE).when(FunctionCommon.class);
         FunctionCommon.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index 276f3c0..faf3f88 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -18,9 +18,31 @@
  */
 package org.apache.pulsar.functions.worker.rest.api.v3;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.doNothing;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
+import static org.powermock.api.mockito.PowerMockito.doThrow;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.testng.Assert.assertEquals;
+
 import com.google.common.collect.Lists;
-import com.google.gson.Gson;
-import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import javax.ws.rs.core.Response;
+
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.core.config.Configurator;
@@ -63,35 +85,11 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
-import javax.ws.rs.core.Response;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-import java.nio.file.Path;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.doNothing;
-import static org.powermock.api.mockito.PowerMockito.doReturn;
-import static org.powermock.api.mockito.PowerMockito.doThrow;
-import static org.powermock.api.mockito.PowerMockito.mockStatic;
-import static org.testng.Assert.assertEquals;
-
 /**
  * Unit test of {@link SourcesApiV3Resource}.
  */
 @PrepareForTest({WorkerUtils.class, ConnectorUtils.class, FunctionCommon.class, InstanceUtils.class})
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*" })
-@Slf4j
 public class SourceApiV3ResourceTest {
 
     @ObjectFactory
@@ -839,7 +837,7 @@ public class SourceApiV3ResourceTest {
         FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class));
 
         doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
-        FunctionCommon.extractNarClassLoader(any(Path.class), any(File.class));
+        FunctionCommon.extractNarClassLoader(any(), any());
 
         this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
         when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
@@ -909,7 +907,7 @@ public class SourceApiV3ResourceTest {
         FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class));
 
         doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
-        FunctionCommon.extractNarClassLoader(any(Path.class), any(File.class));
+        FunctionCommon.extractNarClassLoader(any(), any(File.class));
 
         this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
         when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
@@ -1003,7 +1001,7 @@ public class SourceApiV3ResourceTest {
         PowerMockito.when(FunctionCommon.class, "extractFileFromPkgURL", any()).thenCallRealMethod();
 
         doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
-        FunctionCommon.extractNarClassLoader(any(Path.class), any(File.class));
+        FunctionCommon.extractNarClassLoader(any(), any());
 
         this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
         when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java
index a21633d..a4582bc 100644
--- a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.io.file;
 
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java
index 855498e..5be101f 100644
--- a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.io.file;
 
 import static org.testng.Assert.fail;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java
index 98d35a9..9970a55 100644
--- a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.io.file;
 
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
diff --git a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java
index fe8941e..a746f7e 100644
--- a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java
+++ b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java
@@ -19,8 +19,6 @@
 package org.apache.pulsar.io.influxdb;
 
 import com.google.common.collect.Maps;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
@@ -46,7 +44,7 @@ import org.testng.annotations.Test;
 
 import java.util.Map;
 
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -151,7 +149,7 @@ public class InfluxDBGenericRecordSinkTest {
         verify(this.influxDB, times(1)).createDatabase("testDB");
 
         doAnswer(invocationOnMock -> {
-            BatchPoints batchPoints = invocationOnMock.getArgumentAt(0, BatchPoints.class);
+            BatchPoints batchPoints = invocationOnMock.getArgument(0, BatchPoints.class);
             Assert.assertNotNull(batchPoints, "batchPoints should not be null.");
             return null;
         }).when(influxDB).write(any(BatchPoints.class));
diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
index 3eb6f6e..7dc665d 100644
--- a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
+++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
@@ -18,21 +18,14 @@
  */
 package org.apache.pulsar.io.mongodb;
 
+import static java.util.stream.Collectors.toList;
+
 import com.google.common.collect.Lists;
 import com.mongodb.MongoBulkWriteException;
 import com.mongodb.async.client.MongoClient;
 import com.mongodb.async.client.MongoClients;
 import com.mongodb.async.client.MongoCollection;
 import com.mongodb.async.client.MongoDatabase;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.core.SinkContext;
-import org.apache.pulsar.io.core.annotations.Connector;
-import org.apache.pulsar.io.core.annotations.IOType;
-import org.bson.BSONException;
-import org.bson.Document;
-import org.bson.json.JsonParseException;
 
 import java.nio.charset.Charset;
 import java.util.ArrayList;
@@ -42,9 +35,19 @@ import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 import java.util.stream.IntStream;
 
-import static java.util.stream.Collectors.toList;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+import org.bson.BSONException;
+import org.bson.Document;
+import org.bson.json.JsonParseException;
 
 /**
  * The base class for MongoDB sinks.
@@ -70,6 +73,15 @@ public class MongoSink implements Sink<byte[]> {
 
     private ScheduledExecutorService flushExecutor;
 
+    private Supplier<MongoClient> clientProvider;
+
+    public MongoSink() {
+        this(null);
+    }
+
+    public MongoSink(Supplier<MongoClient> clientProvider) {
+        this.clientProvider = clientProvider;
+    }
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
@@ -78,7 +90,12 @@ public class MongoSink implements Sink<byte[]> {
         mongoConfig = MongoConfig.load(config);
         mongoConfig.validate();
 
-        mongoClient = MongoClients.create(mongoConfig.getMongoUri());
+        if (clientProvider != null) {
+            mongoClient = clientProvider.get();
+        } else {
+            mongoClient = MongoClients.create(mongoConfig.getMongoUri());
+        }
+
         final MongoDatabase db = mongoClient.getDatabase(mongoConfig.getDatabase());
         collection = db.getCollection(mongoConfig.getCollection());
 
diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
index c941708..c4ca44a 100644
--- a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
+++ b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
@@ -19,36 +19,35 @@
 
 package org.apache.pulsar.io.mongodb;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import com.mongodb.MongoBulkWriteException;
 import com.mongodb.async.SingleResultCallback;
 import com.mongodb.async.client.MongoClient;
-import com.mongodb.async.client.MongoClients;
 import com.mongodb.async.client.MongoCollection;
 import com.mongodb.async.client.MongoDatabase;
 import com.mongodb.bulk.BulkWriteError;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.SinkContext;
 import org.bson.BsonDocument;
 import org.mockito.Mock;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.testng.IObjectFactory;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.*;
-
-@PrepareForTest(MongoClients.class)
-@PowerMockIgnore({"org.apache.logging.log4j.*"})
 public class MongoSinkTest {
 
     @Mock
@@ -78,7 +77,7 @@ public class MongoSinkTest {
 
     @BeforeMethod
     public void setUp() {
-        sink = new MongoSink();
+
         map = TestHelper.createMap(true);
 
         mockRecord = mock(Record.class);
@@ -87,9 +86,8 @@ public class MongoSinkTest {
         mockMongoDb = mock(MongoDatabase.class);
         mockMongoColl = mock(MongoCollection.class);
 
-        PowerMockito.mockStatic(MongoClients.class);
+        sink = new MongoSink(() -> mockMongoClient);
 
-        when(MongoClients.create(anyString())).thenReturn(mockMongoClient);
         when(mockMongoClient.getDatabase(anyString())).thenReturn(mockMongoDb);
         when(mockMongoDb.getCollection(anyString())).thenReturn(mockMongoColl);
     }
@@ -98,7 +96,7 @@ public class MongoSinkTest {
         when(mockRecord.getValue()).thenReturn("{\"hello\":\"pulsar\"}".getBytes());
 
         doAnswer((invocation) -> {
-            SingleResultCallback cb = invocation.getArgumentAt(1, SingleResultCallback.class);
+            SingleResultCallback cb = invocation.getArgument(1, SingleResultCallback.class);
             MongoBulkWriteException exc = null;
 
             if (throwBulkError) {
@@ -109,17 +107,17 @@ public class MongoSinkTest {
 
             cb.onResult(null, exc);
             return null;
-        }).when(mockMongoColl).insertMany(anyObject(), anyObject());
+        }).when(mockMongoColl).insertMany(any(), any());
     }
 
     private void initFailContext(String msg) {
         when(mockRecord.getValue()).thenReturn(msg.getBytes());
 
         doAnswer((invocation) -> {
-            SingleResultCallback cb = invocation.getArgumentAt(1, SingleResultCallback.class);
+            SingleResultCallback cb = invocation.getArgument(1, SingleResultCallback.class);
             cb.onResult(null, new Exception("Oops"));
             return null;
-        }).when(mockMongoColl).insertMany(anyObject(), anyObject());
+        }).when(mockMongoColl).insertMany(any(), any());
     }
 
     @AfterMethod
diff --git a/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java
index 68bfbd5..45aeca9 100644
--- a/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java
+++ b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.log4j2.appender;
 import java.io.Serializable;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.logging.log4j.core.AbstractLifeCycle;
 import org.apache.logging.log4j.core.Appender;
 import org.apache.logging.log4j.core.Filter;
diff --git a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java
index 71b94ae..73cbadf 100644
--- a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java
+++ b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java
@@ -18,10 +18,10 @@
  */
 package org.apache.pulsar.log4j2.appender;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -120,11 +120,30 @@ public class PulsarAppenderTest {
         doReturn(producerBuilder).when(producerBuilder).topic(anyString());
         doReturn(producerBuilder).when(producerBuilder).producerName(anyString());
         doReturn(producerBuilder).when(producerBuilder).enableBatching(anyBoolean());
-        doReturn(producerBuilder).when(producerBuilder).batchingMaxPublishDelay(anyInt(), any(TimeUnit.class));
+        doReturn(producerBuilder).when(producerBuilder).batchingMaxPublishDelay(anyLong(), any(TimeUnit.class));
         doReturn(producerBuilder).when(producerBuilder).blockIfQueueFull(anyBoolean());
         doReturn(producer).when(producerBuilder).create();
 
         when(producer.newMessage()).then(invocation -> new MockedMessageBuilder());
+        when(producer.send(any(byte[].class)))
+            .thenAnswer(invocationOnMock -> {
+                Message<byte[]> msg = invocationOnMock.getArgument(0);
+                synchronized (history) {
+                    history.add(msg);
+                }
+                return null;
+            });
+
+        when(producer.sendAsync(any(byte[].class)))
+            .thenAnswer(invocationOnMock -> {
+                Message<byte[]> msg = invocationOnMock.getArgument(0);
+                synchronized (history) {
+                    history.add(msg);
+                }
+                CompletableFuture<MessageId> future = new CompletableFuture<>();
+                future.complete(mock(MessageId.class));
+                return future;
+            });
 
         PulsarManager.PULSAR_CLIENT_BUILDER = () -> clientBuilder;
 
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index f964d37..05261a9 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -370,7 +370,6 @@ The Apache Software License, Version 2.0
   * FindBugs JSR305
     - jsr305-3.0.2.jar
   * Objenesis
-    - objenesis-2.1.jar
     - objenesis-2.6.jar
   * Okio
     - okio-1.13.0.jar
@@ -438,7 +437,7 @@ The Apache Software License, Version 2.0
     - jackson-module-jaxb-annotations-2.8.11.jar
     - jackson-module-jsonSchema-2.8.11.jar
   * Java Assist
-    - javassist-3.21.0-GA.jar
+    - javassist-3.25.0-GA.jar
   * Jetty
     - jetty-http-9.4.12.v20180830.jar
     - jetty-io-9.4.12.v20180830.jar
diff --git a/pulsar-sql/presto-distribution/pom.xml b/pulsar-sql/presto-distribution/pom.xml
index 8e2b03a..1b92b80 100644
--- a/pulsar-sql/presto-distribution/pom.xml
+++ b/pulsar-sql/presto-distribution/pom.xml
@@ -237,6 +237,13 @@
                     <header>../../src/license-header.txt</header>
                 </configuration>
             </plugin>
+            <plugin>
+              <artifactId>maven-compiler-plugin</artifactId>
+              <configuration>
+                <source>1.8</source>
+                <target>1.8</target>
+              </configuration>
+            </plugin>
         </plugins>
         <extensions>
             <extension>
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index c38c0ac..fe3ac57 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -81,9 +81,9 @@ import static com.facebook.presto.spi.type.DateType.DATE;
 import static com.facebook.presto.spi.type.TimeType.TIME;
 import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
 import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
index a704be6..dfd723e 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
@@ -45,7 +45,7 @@ import java.util.Map;
 
 import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
 import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
index b86a7a5..c02df94 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
@@ -42,8 +42,8 @@ import java.util.stream.Collectors;
 
 import static com.facebook.presto.spi.type.DateTimeEncoding.packDateTimeWithZone;
 import static com.facebook.presto.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -291,5 +291,5 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
         }
     }
 
-    
+
 }
diff --git a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
index 5764ac7..f8a1aa0 100644
--- a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
+++ b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pulsar.storm;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -33,7 +33,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.pulsar.client.api.ClientBuilder;
@@ -43,26 +42,20 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
-import org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator;
 import org.apache.pulsar.storm.PulsarSpout.SpoutConsumer;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Values;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.Maps;
 
 public class PulsarSpoutTest {
 
-    private static final Logger log = LoggerFactory.getLogger(PulsarSpoutTest.class);
-    
     @Test
     public void testAckFailedMessage() throws Exception {
-        
+
         PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration();
         conf.setServiceUrl("http://localhost:8080");
         conf.setSubscriptionName("sub1");
@@ -77,13 +70,13 @@ public class PulsarSpoutTest {
             @Override
             public void declareOutputFields(OutputFieldsDeclarer declarer) {
             }
-            
+
         });
-        
+
         ClientBuilder builder = spy(new ClientBuilderImpl());
         PulsarSpout spout = spy(new PulsarSpout(conf, builder));
-        
-        Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), null, Schema.BYTES);
+
+        Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), new byte[0], Schema.BYTES);
         Consumer<byte[]> consumer = mock(Consumer.class);
         SpoutConsumer spoutConsumer = new SpoutConsumer(consumer);
         CompletableFuture<Void> future = new CompletableFuture<>();
@@ -92,13 +85,13 @@ public class PulsarSpoutTest {
         Field consField = PulsarSpout.class.getDeclaredField("consumer");
         consField.setAccessible(true);
         consField.set(spout, spoutConsumer);
-        
+
         spout.fail(msg);
         spout.ack(msg);
         spout.emitNextAvailableTuple();
         verify(consumer, atLeast(1)).receive(anyInt(), any());
     }
-    
+
     @Test
     public void testPulsarSpout() throws Exception {
         PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration();
@@ -138,9 +131,7 @@ public class PulsarSpoutTest {
         when(client.getSharedConsumer(any())).thenReturn(consumer);
         instances.put(componentId, client);
 
-        ByteBuf data = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
-        data.writeBytes("test".getBytes());
-        Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), data, Schema.BYTES);
+        Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), "test".getBytes(), Schema.BYTES);
         when(consumer.receive(anyInt(), any())).thenReturn(msg);
 
         spout.open(config, context, collector);
@@ -149,5 +140,5 @@ public class PulsarSpoutTest {
         assertTrue(called.get());
         verify(consumer, atLeast(1)).receive(anyInt(), any());
     }
-    
+
 }
diff --git a/tiered-storage/jcloud/pom.xml b/tiered-storage/jcloud/pom.xml
index 2967307..ba4fb0d 100644
--- a/tiered-storage/jcloud/pom.xml
+++ b/tiered-storage/jcloud/pom.xml
@@ -118,6 +118,11 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>javax.xml.bind</groupId>
+      <artifactId>jaxb-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>javax.annotation</groupId>
       <artifactId>jsr250-api</artifactId>
       <version>1.0</version>
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java
index de311bc..16784a5 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java
@@ -19,7 +19,7 @@
 package org.apache.bookkeeper.mledger.offload.jcloud;
 
 import static org.mockito.AdditionalAnswers.delegatesTo;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -29,14 +29,14 @@ import java.io.InputStream;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
+
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreBackedInputStreamImpl;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.domain.Blob;
-import org.jclouds.blobstore.options.GetOptions;
 import org.jclouds.io.Payload;
 import org.jclouds.io.Payloads;
-import org.mockito.Matchers;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -222,7 +222,7 @@ class BlobStoreBackedInputStreamTest extends BlobStoreTestBase {
         }
 
         verify(spiedBlobStore, times(1))
-            .getBlob(Mockito.eq(BUCKET), Mockito.eq(objectKey), Matchers.<GetOptions>anyObject());
+            .getBlob(Mockito.eq(BUCKET), Mockito.eq(objectKey), any());
     }
 
     @Test
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index 07308ff..a3c4972 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -20,9 +20,9 @@ package org.apache.bookkeeper.mledger.offload.jcloud.impl;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.mockito.AdditionalAnswers.delegatesTo;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.mock;
 
 import com.amazonaws.auth.AWSCredentials;
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java
index 5cf6bd5..2803b16 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java
@@ -25,8 +25,10 @@ import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
 import com.google.common.io.ByteStreams;
 import com.google.common.primitives.Ints;
 import com.google.common.primitives.Longs;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+
 import java.io.ByteArrayInputStream;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -36,8 +38,9 @@ import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 import java.util.stream.IntStream;
+
 import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
+
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
 import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.LedgerEntry;
@@ -47,7 +50,6 @@ import org.apache.bookkeeper.mledger.offload.jcloud.DataBlockHeader;
 import org.testng.annotations.Test;
 import org.testng.collections.Lists;
 
-@Slf4j
 public class BlockAwareSegmentInputStreamTest {
     private static final byte DEFAULT_ENTRY_BYTE = 0xB;