You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/07/10 06:17:13 UTC
[pulsar] branch master updated: Upgrade to Mockito 2.x (#4671)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 108780c Upgrade to Mockito 2.x (#4671)
108780c is described below
commit 108780c79cc9ec71f4ff8ee3a7231766ef545894
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Jul 9 23:17:07 2019 -0700
Upgrade to Mockito 2.x (#4671)
Upgrading to Mockito 2.28 and PowerMock 2.0. This a pre-step to be able to run CI with Java 11 / 12
---
buildtools/pom.xml | 9 +-
distribution/server/src/assemble/LICENSE.bin.txt | 4 +-
.../bookkeeper/client/PulsarMockReadHandle.java | 6 +-
.../bookkeeper/mledger/impl/EntryCacheTest.java | 15 +-
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 6 +-
.../mledger/impl/OffloadPrefixReadTest.java | 22 +-
pom.xml | 21 +-
.../pulsar/broker/admin/AdminApiOffloadTest.java | 22 +-
.../apache/pulsar/broker/admin/NamespacesTest.java | 253 +----
.../broker/auth/AuthenticationServiceTest.java | 2 +-
.../broker/cache/ResourceQuotaCacheTest.java | 2 +-
.../delayed/InMemoryDeliveryTrackerTest.java | 8 +-
.../broker/namespace/NamespaceServiceTest.java | 2 +-
.../broker/namespace/OwnershipCacheTest.java | 5 +-
.../pulsar/broker/service/BrokerServiceTest.java | 6 +-
.../PersistentDispatcherFailoverConsumerTest.java | 21 +-
.../service/PersistentTopicConcurrentTest.java | 2 +-
.../pulsar/broker/service/PersistentTopicTest.java | 55 +-
.../pulsar/broker/service/ReplicatorTest.java | 12 +-
.../pulsar/broker/service/ServerCnxTest.java | 44 +-
.../persistent/PersistentSubscriptionTest.java | 62 +-
...ReplicatedSubscriptionsSnapshotBuilderTest.java | 4 +-
...RegistryServiceWithSchemaDataValidatorTest.java | 6 +-
.../pulsar/client/api/BrokerServiceLookupTest.java | 2 +-
.../client/api/SimpleProducerConsumerTest.java | 2 +-
.../client/api/v1/V1_ProducerConsumerTest.java | 2 +-
.../client/impl/BrokerClientIntegrationTest.java | 16 +-
.../pulsar/common/naming/NamespaceBundleTest.java | 2 +-
.../pulsar/common/naming/NamespaceBundlesTest.java | 4 +-
.../apache/pulsar/compaction/CompactionTest.java | 2 +-
.../apache/pulsar/io/PulsarFunctionTlsTest.java | 2 +-
.../clients/producer/PulsarKafkaProducerTest.java | 61 +-
.../KafkaProducerInterceptorWrapperTest.java | 2 +-
.../apache/pulsar/admin/cli/CmdFunctionsTest.java | 6 +-
.../pulsar/admin/cli/PulsarAdminToolTest.java | 36 +-
.../org/apache/pulsar/admin/cli/TestCmdSinks.java | 4 +-
.../apache/pulsar/admin/cli/TestCmdSources.java | 9 +-
.../org/apache/pulsar/client/impl/MessageImpl.java | 5 +
.../pulsar/client/api/MessageRouterTest.java | 2 +-
.../apache/pulsar/client/impl/ClientCnxTest.java | 6 +-
.../apache/pulsar/client/impl/MessageImplTest.java | 2 +-
.../client/impl/ProducerBuilderImplTest.java | 6 +-
.../schema/SupportVersioningAvroSchemaTest.java | 2 +-
.../SupportVersioningKeyValueSchemaTest.java | 2 +-
.../impl/schema/generic/GenericAvroSchemaTest.java | 4 +-
.../impl/schema/generic/GenericSchemaImplTest.java | 2 +-
.../MultiVersionSchemaInfoProviderTest.java | 9 +-
.../apache/pulsar/client/util/ObjectCacheTest.java | 3 +-
.../apache/pulsar/common/naming/TopicNameTest.java | 2 +
.../pulsar/common/protocol/ByteBufPairTest.java | 2 +-
.../pulsar/common/protocol/PulsarDecoderTest.java | 2 +-
.../connectors/pulsar/PulsarAvroTableSinkTest.java | 10 +-
.../pulsar/PulsarConsumerSourceTests.java | 6 +-
.../connectors/pulsar/PulsarJsonTableSinkTest.java | 11 +-
.../pulsar/functions/source/PulsarSource.java | 2 +-
.../pulsar/functions/instance/ContextImplTest.java | 35 +-
.../instance/JavaInstanceRunnableProcessTest.java | 1012 --------------------
.../instance/state/StateContextImplTest.java | 6 +-
.../pulsar/functions/sink/PulsarSinkTest.java | 95 +-
.../pulsar/functions/source/PulsarSourceTest.java | 39 +-
.../windowing/WindowFunctionExecutorTest.java | 22 +-
.../KubernetesSecretsTokenAuthProviderTest.java | 20 +-
.../EnvironmentBasedSecretsProviderTest.java | 6 +-
.../apache/pulsar/functions/utils/ActionsTest.java | 2 +-
.../worker/ClusterServiceCoordinatorTest.java | 4 +-
.../functions/worker/FunctionActionerTest.java | 3 +-
.../worker/FunctionMetaDataManagerTest.java | 147 ++-
.../worker/FunctionMetaDataTopicTailerTest.java | 4 +-
.../worker/FunctionRuntimeManagerTest.java | 108 +--
.../functions/worker/MembershipManagerTest.java | 27 +-
.../functions/worker/SchedulerManagerTest.java | 8 +-
.../functions/worker/dlog/DLInputStreamTest.java | 6 +-
.../functions/worker/dlog/DLOutputStreamTest.java | 2 +-
.../worker/executor/MockExecutorController.java | 20 +-
.../worker/request/ServiceRequestManagerTest.java | 2 +-
.../worker/rest/api/FunctionsImplTest.java | 6 +-
.../rest/api/v2/FunctionApiV2ResourceTest.java | 57 +-
.../rest/api/v3/FunctionApiV3ResourceTest.java | 56 +-
.../worker/rest/api/v3/SinkApiV3ResourceTest.java | 16 +-
.../rest/api/v3/SourceApiV3ResourceTest.java | 56 +-
.../pulsar/io/file/FileConsumerThreadTests.java | 2 +-
.../pulsar/io/file/FileListingThreadTests.java | 2 +-
.../pulsar/io/file/ProcessedFileThreadTests.java | 2 +-
.../io/influxdb/InfluxDBGenericRecordSinkTest.java | 6 +-
.../org/apache/pulsar/io/mongodb/MongoSink.java | 39 +-
.../apache/pulsar/io/mongodb/MongoSinkTest.java | 40 +-
.../pulsar/log4j2/appender/PulsarAppender.java | 1 +
.../pulsar/log4j2/appender/PulsarAppenderTest.java | 29 +-
pulsar-sql/presto-distribution/LICENSE | 3 +-
pulsar-sql/presto-distribution/pom.xml | 7 +
.../pulsar/sql/presto/TestPulsarConnector.java | 6 +-
.../pulsar/sql/presto/TestPulsarMetadata.java | 2 +-
.../pulsar/sql/presto/TestPulsarSplitManager.java | 6 +-
.../org/apache/pulsar/storm/PulsarSpoutTest.java | 31 +-
tiered-storage/jcloud/pom.xml | 5 +
.../jcloud/BlobStoreBackedInputStreamTest.java | 8 +-
.../impl/BlobStoreManagedLedgerOffloaderTest.java | 6 +-
.../impl/BlockAwareSegmentInputStreamTest.java | 6 +-
98 files changed, 755 insertions(+), 2022 deletions(-)
diff --git a/buildtools/pom.xml b/buildtools/pom.xml
index f895e81..a6f1daf 100644
--- a/buildtools/pom.xml
+++ b/buildtools/pom.xml
@@ -38,7 +38,7 @@
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
- <version>6.13.1</version>
+ <version>6.14.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
@@ -67,6 +67,13 @@
<header>../src/license-header.txt</header>
</configuration>
</plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
</plugins>
<extensions>
<extension>
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index f59e6c9..76c30cb 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -421,7 +421,7 @@ The Apache Software License, Version 2.0
* Google Error Prone Annotations - com.google.errorprone-error_prone_annotations-2.2.0.jar
* OkHttp - com.squareup.okhttp-okhttp-2.5.0.jar
* Okio - com.squareup.okio-okio-1.13.0.jar
- * Javassist -- org.javassist-javassist-3.21.0-GA.jar
+ * Javassist -- org.javassist-javassist-3.25.0-GA.jar
* gRPC
- io.grpc-grpc-all-1.18.0.jar
- io.grpc-grpc-auth-1.18.0.jar
@@ -455,7 +455,7 @@ The Apache Software License, Version 2.0
* Snappy Java
- org.xerial.snappy-snappy-java-1.1.1.3.jar
* Objenesis
- - org.objenesis-objenesis-2.1.jar
+ - org.objenesis-objenesis-2.6.jar
* Squareup
- com.squareup.okhttp-logging-interceptor-2.7.5.jar
- com.squareup.okhttp-okhttp-ws-2.7.5.jar
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java
index 30bcf46..7458573 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java
@@ -80,7 +80,11 @@ class PulsarMockReadHandle implements ReadHandle {
@Override
public long getLastAddConfirmed() {
- return entries.get(entries.size() - 1).getEntryId();
+ if (entries.isEmpty()) {
+ return -1;
+ } else {
+ return entries.get(entries.size() - 1).getEntryId();
+ }
}
@Override
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
index c48ea24..aa2730c 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
@@ -18,19 +18,24 @@
*/
package org.apache.bookkeeper.mledger.impl;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.*;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import io.netty.buffer.Unpooled;
+
import java.lang.reflect.Method;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+
import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
-import org.apache.bookkeeper.client.api.BKException;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
@@ -39,8 +44,6 @@ import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 72d19ce..4f27611 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -18,8 +18,8 @@
*/
package org.apache.bookkeeper.mledger.impl;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -95,9 +95,9 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.Ledge
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
-import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.Code;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
index 08e88ee..038cbe4 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
@@ -18,10 +18,10 @@
*/
package org.apache.bookkeeper.mledger.impl;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyMap;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.eq;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -32,7 +32,6 @@ import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@@ -49,23 +48,16 @@ import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
-
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.testng.Assert;
import org.testng.annotations.Test;
public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
- private static final Logger log = LoggerFactory.getLogger(OffloadPrefixReadTest.class);
-
@Test
public void testOffloadRead() throws Exception {
MockLedgerOffloader offloader = spy(new MockLedgerOffloader());
@@ -101,21 +93,21 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
Assert.assertEquals(new String(e.getData()), "entry-" + i++);
}
verify(offloader, times(1))
- .readOffloaded(anyLong(), anyObject(), anyMap());
+ .readOffloaded(anyLong(), any(), anyMap());
verify(offloader).readOffloaded(anyLong(), eq(firstLedgerUUID), anyMap());
for (Entry e : cursor.readEntries(10)) {
Assert.assertEquals(new String(e.getData()), "entry-" + i++);
}
verify(offloader, times(2))
- .readOffloaded(anyLong(), anyObject(), anyMap());
+ .readOffloaded(anyLong(), any(), anyMap());
verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID), anyMap());
for (Entry e : cursor.readEntries(5)) {
Assert.assertEquals(new String(e.getData()), "entry-" + i++);
}
verify(offloader, times(2))
- .readOffloaded(anyLong(), anyObject(), anyMap());
+ .readOffloaded(anyLong(), any(), anyMap());
}
static class MockLedgerOffloader implements LedgerOffloader {
diff --git a/pom.xml b/pom.xml
index 8c64e95..ef19d94 100644
--- a/pom.xml
+++ b/pom.xml
@@ -205,6 +205,10 @@ flexible messaging model and an intuitive client API.</description>
<disruptor.version>3.4.0</disruptor.version>
<testcontainers.version>1.11.2</testcontainers.version>
<kerby.version>1.1.1</kerby.version>
+ <testng.version>6.14.3</testng.version>
+ <mockito.version>2.28.2</mockito.version>
+ <powermock.version>2.0.2</powermock.version>
+ <javassist.version>3.25.0-GA</javassist.version>
<!-- Plugin dependencies -->
<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
@@ -239,25 +243,25 @@ flexible messaging model and an intuitive client API.</description>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
- <version>6.14.3</version>
+ <version>${testng.version}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
- <version>1.10.19</version>
+ <version>${mockito.version}</version>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
- <artifactId>powermock-api-mockito</artifactId>
- <version>1.7.3</version>
+ <artifactId>powermock-api-mockito2</artifactId>
+ <version>${powermock.version}</version>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-testng</artifactId>
- <version>1.7.3</version>
+ <version>${powermock.version}</version>
</dependency>
<dependency>
@@ -1003,6 +1007,11 @@ flexible messaging model and an intuitive client API.</description>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>${joda.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.javassist</groupId>
+ <artifactId>javassist</artifactId>
+ <version>${javassist.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
@@ -1030,7 +1039,7 @@ flexible messaging model and an intuitive client API.</description>
<dependency>
<groupId>org.powermock</groupId>
- <artifactId>powermock-api-mockito</artifactId>
+ <artifactId>powermock-api-mockito2</artifactId>
<scope>test</scope>
</dependency>
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index e4b889e..72591c6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.broker.admin;
-import static org.mockito.Matchers.anyObject;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -29,33 +29,23 @@ import com.google.common.collect.Sets;
import java.util.concurrent.CompletableFuture;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-@Slf4j
public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
- private static final Logger LOG = LoggerFactory.getLogger(AdminApiOffloadTest.class);
-
@BeforeMethod
@Override
public void setup() throws Exception {
@@ -84,10 +74,10 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
doReturn(offloader).when(pulsar).getManagedLedgerOffloader();
CompletableFuture<Void> promise = new CompletableFuture<>();
- doReturn(promise).when(offloader).offload(anyObject(), anyObject(), anyObject());
+ doReturn(promise).when(offloader).offload(any(), any(), any());
MessageId currentId = MessageId.latest;
- try (Producer p = pulsarClient.newProducer().topic(topicName).enableBatching(false).create()) {
+ try (Producer<byte[]> p = pulsarClient.newProducer().topic(topicName).enableBatching(false).create()) {
for (int i = 0; i < 15; i++) {
currentId = p.send("Foobar".getBytes());
}
@@ -120,7 +110,7 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
// Try again
doReturn(CompletableFuture.completedFuture(null))
- .when(offloader).offload(anyObject(), anyObject(), anyObject());
+ .when(offloader).offload(any(), any(), any());
admin.topics().triggerOffload(topicName, currentId);
@@ -131,7 +121,7 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
Assert.assertEquals(firstUnoffloaded.getLedgerId(), info.ledgers.get(1).ledgerId);
Assert.assertEquals(firstUnoffloaded.getEntryId(), 0);
- verify(offloader, times(2)).offload(anyObject(), anyObject(), anyObject());
+ verify(offloader, times(2)).offload(any(), any(), any());
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index c615a09..138404e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -19,7 +19,7 @@
package org.apache.pulsar.broker.admin;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
@@ -34,6 +34,9 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URL;
@@ -69,26 +72,20 @@ import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.ZooDefs;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
+import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
@Test
public class NamespacesTest extends MockedPulsarServiceBaseTest {
@@ -560,26 +557,11 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
// setup to redirect to another broker in the same cluster
doReturn(Optional.of(new URL("http://otherhost" + ":" + BROKER_WEBSERVICE_PORT))).when(nsSvc)
- .getWebServiceUrl(Mockito.argThat(new Matcher<NamespaceName>() {
-
+ .getWebServiceUrl(Mockito.argThat(new ArgumentMatcher<NamespaceName>() {
@Override
- public void describeTo(Description description) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public boolean matches(Object item) {
- NamespaceName nsname = (NamespaceName) item;
+ public boolean matches(NamespaceName nsname) {
return nsname.equals(NamespacesTest.this.testGlobalNamespaces.get(0));
}
-
- @Override
- public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {
- // TODO Auto-generated method stub
-
- }
-
}), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean());
admin.namespaces().setNamespaceReplicationClusters(testGlobalNamespaces.get(0).toString(),
@@ -631,7 +613,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
// delete the topic from ZK
mockZookKeeper.delete("/managed-ledgers/" + topicName.getPersistenceNamingEncoding(), -1);
- ZkUtils.createFullPathOptimistic(mockZookKeeper, "/admin/partitioned-topics/" + topicName.getPersistenceNamingEncoding(),
+ ZkUtils.createFullPathOptimistic(mockZookKeeper,
+ "/admin/partitioned-topics/" + topicName.getPersistenceNamingEncoding(),
new byte[0], null, null);
try {
namespaces.deleteNamespace(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false);
@@ -675,67 +658,27 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
createBundledTestNamespaces(this.testTenant, this.testLocalCluster, bundledNsLocal, bundleData);
final NamespaceName testNs = NamespaceName.get(this.testTenant, this.testLocalCluster, bundledNsLocal);
- org.apache.pulsar.client.admin.Namespaces namespacesAdmin = mock(org.apache.pulsar.client.admin.Namespaces.class);
+ org.apache.pulsar.client.admin.Namespaces namespacesAdmin = mock(
+ org.apache.pulsar.client.admin.Namespaces.class);
doReturn(namespacesAdmin).when(admin).namespaces();
- doReturn(null).when(nsSvc).getWebServiceUrl(Mockito.argThat(new Matcher<NamespaceBundle>() {
-
- @Override
- public void describeTo(Description description) {
- }
-
- @Override
- public boolean matches(Object item) {
- if (item instanceof NamespaceBundle) {
- NamespaceBundle bundle = (NamespaceBundle) item;
- return bundle.getNamespaceObject().equals(testNs);
- }
- return false;
- }
-
+ doReturn(null).when(nsSvc).getWebServiceUrl(Mockito.argThat(new ArgumentMatcher<NamespaceBundle>() {
@Override
- public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {
+ public boolean matches(NamespaceBundle bundle) {
+ return bundle.getNamespaceObject().equals(testNs);
}
-
}), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean());
- doReturn(false).when(nsSvc).isServiceUnitOwned(Mockito.argThat(new Matcher<NamespaceBundle>() {
-
- @Override
- public void describeTo(Description description) {
- }
-
+ doReturn(false).when(nsSvc).isServiceUnitOwned(Mockito.argThat(new ArgumentMatcher<NamespaceBundle>() {
@Override
- public boolean matches(Object item) {
- if (item instanceof NamespaceBundle) {
- NamespaceBundle bundle = (NamespaceBundle) item;
- return bundle.getNamespaceObject().equals(testNs);
- }
- return false;
+ public boolean matches(NamespaceBundle bundle) {
+ return bundle.getNamespaceObject().equals(testNs);
}
-
- @Override
- public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {
- }
-
}));
doReturn(Optional.of(new NamespaceEphemeralData())).when(nsSvc)
- .getOwner(Mockito.argThat(new Matcher<NamespaceBundle>() {
-
+ .getOwner(Mockito.argThat(new ArgumentMatcher<NamespaceBundle>() {
@Override
- public void describeTo(Description description) {
- }
-
- @Override
- public boolean matches(Object item) {
- if (item instanceof NamespaceBundle) {
- NamespaceBundle bundle = (NamespaceBundle) item;
- return bundle.getNamespaceObject().equals(testNs);
- }
- return false;
- }
-
- @Override
- public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {
+ public boolean matches(NamespaceBundle bundle) {
+ return bundle.getNamespaceObject().equals(testNs);
}
}));
@@ -760,7 +703,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
NamespaceBundles nsBundles = nsSvc.getNamespaceBundleFactory().getBundles(testNs, bundleData);
// make one bundle owned
- doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(nsBundles.getBundles().get(0), false, true, false);
+ doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(nsBundles.getBundles().get(0), false,
+ true, false);
doReturn(true).when(nsSvc).isServiceUnitOwned(nsBundles.getBundles().get(0));
doNothing().when(namespacesAdmin).deleteNamespaceBundle(
testTenant + "/" + testLocalCluster + "/" + bundledNsLocal, "0x00000000_0x80000000");
@@ -793,54 +737,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
final NamespaceName testNs = this.testLocalNamespaces.get(1);
URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc)
- .getWebServiceUrl(Mockito.argThat(new Matcher<NamespaceBundle>() {
-
- @Override
- public void describeTo(Description description) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public boolean matches(Object item) {
- if (item instanceof NamespaceName) {
- NamespaceName ns = (NamespaceName) item;
- return ns.equals(testNs);
- }
- return false;
- }
-
- @Override
- public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {
- // TODO Auto-generated method stub
-
- }
-
- }), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean());
- doReturn(true).when(nsSvc).isServiceUnitOwned(Mockito.argThat(new Matcher<NamespaceBundle>() {
-
- @Override
- public void describeTo(Description description) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public boolean matches(Object item) {
- if (item instanceof NamespaceName) {
- NamespaceName ns = (NamespaceName) item;
- return ns.equals(testNs);
- }
- return false;
- }
-
- @Override
- public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {
- // TODO Auto-generated method stub
-
- }
-
- }));
+ .getWebServiceUrl(Mockito.argThat(ns -> ns.equals(testNs)), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean());
+ doReturn(true).when(nsSvc).isServiceUnitOwned(Mockito.argThat(ns -> ns.equals(testNs)));
NamespaceBundle bundle = nsSvc.getNamespaceBundleFactory().getFullBundle(testNs);
doNothing().when(namespaces).validateBundleOwnership(bundle, false, true);
@@ -914,54 +812,10 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
final NamespaceName testNs = NamespaceName.get(this.testTenant, this.testLocalCluster, bundledNsLocal);
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc)
- .getWebServiceUrl(Mockito.argThat(new Matcher<NamespaceBundle>() {
-
- @Override
- public void describeTo(Description description) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public boolean matches(Object item) {
- if (item instanceof NamespaceBundle) {
- NamespaceBundle bundle = (NamespaceBundle) item;
- return bundle.getNamespaceObject().equals(testNs);
- }
- return false;
- }
-
- @Override
- public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {
- // TODO Auto-generated method stub
-
- }
-
- }), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean());
- doReturn(true).when(nsSvc).isServiceUnitOwned(Mockito.argThat(new Matcher<NamespaceBundle>() {
-
- @Override
- public void describeTo(Description description) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public boolean matches(Object item) {
- if (item instanceof NamespaceBundle) {
- NamespaceBundle bundle = (NamespaceBundle) item;
- return bundle.getNamespaceObject().equals(testNs);
- }
- return false;
- }
-
- @Override
- public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {
- // TODO Auto-generated method stub
-
- }
-
- }));
+ .getWebServiceUrl(Mockito.argThat(bundle -> bundle.getNamespaceObject().equals(testNs)),
+ Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean());
+ doReturn(true).when(nsSvc)
+ .isServiceUnitOwned(Mockito.argThat(bundle -> bundle.getNamespaceObject().equals(testNs)));
NamespaceBundles nsBundles = nsSvc.getNamespaceBundleFactory().getBundles(testNs, bundleData);
NamespaceBundle testBundle = nsBundles.getBundles().get(0);
@@ -1119,7 +973,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
doReturn("persistent").when(topics).domain();
topics.validateTopicName(topicName.getTenant(), topicName.getCluster(),
- topicName.getNamespacePortion(), topicName.getEncodedLocalName());
+ topicName.getNamespacePortion(), topicName.getEncodedLocalName());
topics.validateAdminOperationOnTopic(false);
}
@@ -1163,7 +1017,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
public void testSubscribeRate() throws Exception {
SubscribeRate subscribeRate = new SubscribeRate(1, 5);
String namespace = "my-tenants/my-namespace";
- admin.tenants().createTenant("my-tenants", new TenantInfo(Sets.newHashSet(), Sets.newHashSet(testLocalCluster)));
+ admin.tenants().createTenant("my-tenants",
+ new TenantInfo(Sets.newHashSet(), Sets.newHashSet(testLocalCluster)));
admin.namespaces().createNamespace(namespace, Sets.newHashSet(testLocalCluster));
admin.namespaces().setSubscribeRate(namespace, subscribeRate);
assertEquals(subscribeRate, admin.namespaces().getSubscribeRate(namespace));
@@ -1171,7 +1026,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
admin.topics().createPartitionedTopic(topicName, 2);
pulsar.getConfiguration().setAuthorizationEnabled(false);
- Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ Consumer<?> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("subscribe-rate")
@@ -1202,43 +1057,9 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
private void mockWebUrl(URL localWebServiceUrl, NamespaceName namespace) throws Exception {
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc)
- .getWebServiceUrl(Mockito.argThat(new Matcher<NamespaceBundle>() {
- @Override
- public void describeTo(Description description) {
- }
-
- @Override
- public boolean matches(Object item) {
- if (item instanceof NamespaceBundle) {
- NamespaceBundle bundle = (NamespaceBundle) item;
- return bundle.getNamespaceObject().equals(namespace);
- }
- return false;
- }
-
- @Override
- public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {
- }
- }), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean());
- doReturn(true).when(nsSvc).isServiceUnitOwned(Mockito.argThat(new Matcher<NamespaceBundle>() {
- @Override
- public void describeTo(Description description) {
- }
-
- @Override
- public boolean matches(Object item) {
- if (item instanceof NamespaceBundle) {
- NamespaceBundle bundle = (NamespaceBundle) item;
- return bundle.getNamespaceObject().equals(namespace);
- }
- return false;
- }
-
- @Override
- public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {
- }
- }));
+ .getWebServiceUrl(Mockito.argThat(bundle -> bundle.getNamespaceObject().equals(namespace)),
+ Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean());
+ doReturn(true).when(nsSvc)
+ .isServiceUnitOwned(Mockito.argThat(bundle -> bundle.getNamespaceObject().equals(namespace)));
}
-
- private static final Logger log = LoggerFactory.getLogger(NamespacesTest.class);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthenticationServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthenticationServiceTest.java
index 6600566..f75fa54 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthenticationServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthenticationServiceTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.broker.auth;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
index d07fae3..6fb4286 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.broker.cache;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index fbfc4d2..6f2cdf0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -18,8 +18,6 @@
*/
package org.apache.pulsar.broker.delayed;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
@@ -112,9 +110,9 @@ public class InMemoryDeliveryTrackerTest {
NavigableMap<Long, TimerTask> tasks = new TreeMap<>();
when(timer.newTimeout(any(), anyLong(), any())).then(invocation -> {
- TimerTask task = invocation.getArgumentAt(0, TimerTask.class);
- long timeout = invocation.getArgumentAt(1, Long.class);
- TimeUnit unit = invocation.getArgumentAt(2, TimeUnit.class);
+ TimerTask task = invocation.getArgument(0, TimerTask.class);
+ long timeout = invocation.getArgument(1, Long.class);
+ TimeUnit unit = invocation.getArgument(2, TimeUnit.class);
long scheduleAt = clockTime.get() + unit.toMillis(timeout);
tasks.put(scheduleAt, task);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 84f54a7..6d9d398 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.broker.namespace;
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
index d616e21..172f56d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
@@ -20,8 +20,7 @@ package org.apache.pulsar.broker.namespace;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.pulsar.broker.PulsarService.webAddress;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -87,7 +86,7 @@ public class OwnershipCacheTest {
bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
nsService = mock(NamespaceService.class);
brokerService = mock(BrokerService.class);
- doReturn(CompletableFuture.completedFuture(1)).when(brokerService).unloadServiceUnit(anyObject());
+ doReturn(CompletableFuture.completedFuture(1)).when(brokerService).unloadServiceUnit(any());
doReturn(zkCache).when(pulsar).getLocalZkCache();
doReturn(localCache).when(pulsar).getLocalZkCacheService();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 79d6c79..d0d5c28 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.broker.service;
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
-import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
@@ -787,7 +787,7 @@ public class BrokerServiceTest extends BrokerTestBase {
// create topic will fail to get managedLedgerConfig
CompletableFuture<ManagedLedgerConfig> failedManagedLedgerConfig = new CompletableFuture<>();
failedManagedLedgerConfig.completeExceptionally(new NullPointerException("failed to peristent policy"));
- doReturn(failedManagedLedgerConfig).when(service).getManagedLedgerConfig(anyObject());
+ doReturn(failedManagedLedgerConfig).when(service).getManagedLedgerConfig(any());
CompletableFuture<Void> topicCreation = new CompletableFuture<Void>();
@@ -830,7 +830,7 @@ public class BrokerServiceTest extends BrokerTestBase {
// create topic will fail to get managedLedgerConfig
CompletableFuture<ManagedLedgerConfig> failedManagedLedgerConfig = new CompletableFuture<>();
failedManagedLedgerConfig.complete(new ManagedLedgerConfig());
- doReturn(failedManagedLedgerConfig).when(service).getManagedLedgerConfig(anyObject());
+ doReturn(failedManagedLedgerConfig).when(service).getManagedLedgerConfig(any());
CompletableFuture<Void> topicCreation = new CompletableFuture<Void>();
// fail managed-ledger future
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index f3d1c970..ab96fd5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -20,10 +20,9 @@ package org.apache.pulsar.broker.service;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.matches;
-import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.matches;
+import static org.mockito.Mockito.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -137,7 +136,7 @@ public class PersistentDispatcherFailoverConsumerTest {
consumerChanges = new LinkedBlockingQueue<>();
this.channelCtx = mock(ChannelHandlerContext.class);
doAnswer(invocationOnMock -> {
- ByteBuf buf = invocationOnMock.getArgumentAt(0, ByteBuf.class);
+ ByteBuf buf = invocationOnMock.getArgument(0);
ByteBuf cmdBuf = buf.retainedSlice(4, buf.writerIndex() - 4);
try {
@@ -203,7 +202,7 @@ public class PersistentDispatcherFailoverConsumerTest {
return null;
}
}).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
- any(OpenLedgerCallback.class), anyObject());
+ any(OpenLedgerCallback.class), any());
// call openLedgerFailed on ML factory asyncOpen
doAnswer(new Answer<Object>() {
@@ -214,7 +213,7 @@ public class PersistentDispatcherFailoverConsumerTest {
return null;
}
}).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
- any(OpenLedgerCallback.class), anyObject());
+ any(OpenLedgerCallback.class), any());
// call addComplete on ledger asyncAddEntry
doAnswer(new Answer<Object>() {
@@ -223,7 +222,7 @@ public class PersistentDispatcherFailoverConsumerTest {
((AddEntryCallback) invocationOnMock.getArguments()[1]).addComplete(new PositionImpl(1, 1), null);
return null;
}
- }).when(ledgerMock).asyncAddEntry(any(byte[].class), any(AddEntryCallback.class), anyObject());
+ }).when(ledgerMock).asyncAddEntry(any(byte[].class), any(AddEntryCallback.class), any());
// call openCursorComplete on cursor asyncOpen
doAnswer(new Answer<Object>() {
@@ -232,7 +231,7 @@ public class PersistentDispatcherFailoverConsumerTest {
((OpenCursorCallback) invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null);
return null;
}
- }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), anyObject());
+ }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), any());
// call deleteLedgerComplete on ledger asyncDelete
doAnswer(new Answer<Object>() {
@@ -241,7 +240,7 @@ public class PersistentDispatcherFailoverConsumerTest {
((DeleteLedgerCallback) invocationOnMock.getArguments()[0]).deleteLedgerComplete(null);
return null;
}
- }).when(ledgerMock).asyncDelete(any(DeleteLedgerCallback.class), anyObject());
+ }).when(ledgerMock).asyncDelete(any(DeleteLedgerCallback.class), any());
doAnswer(new Answer<Object>() {
@Override
@@ -249,7 +248,7 @@ public class PersistentDispatcherFailoverConsumerTest {
((DeleteCursorCallback) invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
return null;
}
- }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), anyObject());
+ }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), any());
}
private void verifyActiveConsumerChange(CommandActiveConsumerChange change,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index 01895a8..d50a5b2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -19,7 +19,7 @@
package org.apache.pulsar.broker.service;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 325a735..5d7bbaf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -21,11 +21,10 @@ package org.apache.pulsar.broker.service;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.matches;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.matches;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
@@ -165,7 +164,7 @@ public class PersistentTopicTest {
ZooKeeperDataCache<Policies> zkDataCache = mock(ZooKeeperDataCache.class);
doReturn(zkDataCache).when(configCacheService).policiesCache();
doReturn(configCacheService).when(pulsar).getConfigurationCache();
- doReturn(Optional.empty()).when(zkDataCache).get(anyString());
+ doReturn(Optional.empty()).when(zkDataCache).get(any());
LocalZooKeeperCacheService zkCache = mock(LocalZooKeeperCacheService.class);
doReturn(CompletableFuture.completedFuture(Optional.empty())).when(zkDataCache).getAsync(any());
@@ -183,8 +182,8 @@ public class PersistentTopicTest {
NamespaceService nsSvc = mock(NamespaceService.class);
doReturn(nsSvc).when(pulsar).getNamespaceService();
- doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
- doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
+ doReturn(true).when(nsSvc).isServiceUnitOwned(any());
+ doReturn(true).when(nsSvc).isServiceUnitActive(any());
setupMLAsyncCallbackMocks();
}
@@ -214,7 +213,7 @@ public class PersistentTopicTest {
return null;
}
}).when(mlFactoryMock).asyncOpen(anyString(), any(ManagedLedgerConfig.class), any(OpenLedgerCallback.class),
- anyObject());
+ any());
CompletableFuture<Void> future = brokerService.getOrCreateTopic(topicName).thenAccept(topic -> {
assertTrue(topic.toString().contains(topicName));
@@ -245,7 +244,7 @@ public class PersistentTopicTest {
return null;
}
}).when(mlFactoryMock).asyncOpen(anyString(), any(ManagedLedgerConfig.class), any(OpenLedgerCallback.class),
- anyObject());
+ any());
CompletableFuture<Topic> future = brokerService.getOrCreateTopic(jinxedTopicName);
@@ -327,7 +326,7 @@ public class PersistentTopicTest {
new ManagedLedgerException("Managed ledger failure"), invocationOnMock.getArguments()[2]);
return null;
}
- }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), anyObject());
+ }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), any());
topic.publishMessage(payload, (exception, ledgerId, entryId) -> {
if (exception == null) {
@@ -705,7 +704,7 @@ public class PersistentTopicTest {
Thread.sleep(1000);
return null;
}
- }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), anyObject());
+ }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), any());
ExecutorService executor = Executors.newCachedThreadPool();
@@ -882,7 +881,7 @@ public class PersistentTopicTest {
((DeleteLedgerCallback) invocationOnMock.getArguments()[0]).deleteLedgerComplete(null);
return null;
}
- }).when(ledgerMock).asyncDelete(any(DeleteLedgerCallback.class), anyObject());
+ }).when(ledgerMock).asyncDelete(any(DeleteLedgerCallback.class), any());
ExecutorService executor = Executors.newCachedThreadPool();
@@ -960,7 +959,7 @@ public class PersistentTopicTest {
return null;
}
}).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
- any(OpenLedgerCallback.class), anyObject());
+ any(OpenLedgerCallback.class), any());
// call openLedgerFailed on ML factory asyncOpen
doAnswer(new Answer<Object>() {
@@ -971,7 +970,7 @@ public class PersistentTopicTest {
return null;
}
}).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
- any(OpenLedgerCallback.class), anyObject());
+ any(OpenLedgerCallback.class), any());
// call addComplete on ledger asyncAddEntry
doAnswer(new Answer<Object>() {
@@ -981,7 +980,7 @@ public class PersistentTopicTest {
invocationOnMock.getArguments()[2]);
return null;
}
- }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), anyObject());
+ }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), any());
// call openCursorComplete on cursor asyncOpen
doAnswer(new Answer<Object>() {
@@ -990,7 +989,7 @@ public class PersistentTopicTest {
((OpenCursorCallback) invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null);
return null;
}
- }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), anyObject());
+ }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), any());
doAnswer(new Answer<Object>() {
@Override
@@ -999,7 +998,7 @@ public class PersistentTopicTest {
return null;
}
}).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class),
- any(OpenCursorCallback.class), anyObject());
+ any(OpenCursorCallback.class), any());
// call deleteLedgerComplete on ledger asyncDelete
doAnswer(new Answer<Object>() {
@@ -1008,7 +1007,7 @@ public class PersistentTopicTest {
((DeleteLedgerCallback) invocationOnMock.getArguments()[0]).deleteLedgerComplete(null);
return null;
}
- }).when(ledgerMock).asyncDelete(any(DeleteLedgerCallback.class), anyObject());
+ }).when(ledgerMock).asyncDelete(any(DeleteLedgerCallback.class), any());
doAnswer(new Answer<Object>() {
@Override
@@ -1016,13 +1015,13 @@ public class PersistentTopicTest {
((DeleteCursorCallback) invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
return null;
}
- }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), anyObject());
+ }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), any());
doAnswer((invokactionOnMock) -> {
((MarkDeleteCallback) invokactionOnMock.getArguments()[2])
.markDeleteComplete(invokactionOnMock.getArguments()[3]);
return null;
- }).when(cursorMock).asyncMarkDelete(anyObject(), anyObject(), any(MarkDeleteCallback.class), anyObject());
+ }).when(cursorMock).asyncMarkDelete(any(), any(), any(MarkDeleteCallback.class), any());
}
@Test
@@ -1185,7 +1184,7 @@ public class PersistentTopicTest {
String localCluster = "local";
String remoteCluster = "remote";
final ManagedLedger ledgerMock = mock(ManagedLedger.class);
- doNothing().when(ledgerMock).asyncDeleteCursor(anyObject(), anyObject(), anyObject());
+ doNothing().when(ledgerMock).asyncDeleteCursor(any(), any(), any());
doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
PersistentTopic topic = new PersistentTopic(globalTopicName, ledgerMock, brokerService);
@@ -1219,7 +1218,7 @@ public class PersistentTopicTest {
// step-3 : complete the callback to remove replicator from the list
ArgumentCaptor<DeleteCursorCallback> captor = ArgumentCaptor.forClass(DeleteCursorCallback.class);
- Mockito.verify(ledgerMock).asyncDeleteCursor(anyObject(), captor.capture(), anyObject());
+ Mockito.verify(ledgerMock).asyncDeleteCursor(any(), captor.capture(), any());
DeleteCursorCallback callback = captor.getValue();
callback.deleteCursorComplete(null);
}
@@ -1231,7 +1230,7 @@ public class PersistentTopicTest {
String localCluster = "local";
String remoteCluster = "remote";
final ManagedLedger ledgerMock = mock(ManagedLedger.class);
- doNothing().when(ledgerMock).asyncDeleteCursor(anyObject(), anyObject(), anyObject());
+ doNothing().when(ledgerMock).asyncDeleteCursor(any(), any(), any());
doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
PersistentTopic topic = new PersistentTopic(globalTopicName, ledgerMock, brokerService);
@@ -1252,7 +1251,7 @@ public class PersistentTopicTest {
verify(clientImpl)
.createProducerAsync(
any(ProducerConfigurationData.class),
- any(Schema.class), eq(null)
+ any(), eq(null)
);
replicator.disconnect(false);
@@ -1260,11 +1259,7 @@ public class PersistentTopicTest {
replicator.startProducer();
- verify(clientImpl, Mockito.times(2))
- .createProducerAsync(
- any(ProducerConfigurationData.class),
- any(Schema.class), any(null)
- );
+ verify(clientImpl, Mockito.times(2)).createProducerAsync(any(), any(), any());
}
@Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 4a65b93..6f7731b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -18,13 +18,18 @@
*/
package org.apache.pulsar.broker.service;
-import static org.mockito.Matchers.eq;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import com.google.common.collect.Sets;
+import com.scurrilous.circe.checksum.Crc32cIntChecksum;
+
+import io.netty.buffer.ByteBuf;
+
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.List;
@@ -75,11 +80,6 @@ import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import org.testng.collections.Lists;
-import com.google.common.collect.Sets;
-import com.scurrilous.circe.checksum.Crc32cIntChecksum;
-
-import io.netty.buffer.ByteBuf;
-
/**
* Starts 3 brokers that are in 3 different clusters
*/
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 217b6f1..07b619c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -21,9 +21,8 @@ package org.apache.pulsar.broker.service;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.matches;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.matches;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
@@ -100,7 +99,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
-import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.Policies;
@@ -168,7 +166,7 @@ public class ServerCnxTest {
configCacheService = mock(ConfigurationCacheService.class);
ZooKeeperDataCache<Policies> zkDataCache = mock(ZooKeeperDataCache.class);
- doReturn(Optional.empty()).when(zkDataCache).get(anyObject());
+ doReturn(Optional.empty()).when(zkDataCache).get(any());
doReturn(zkDataCache).when(configCacheService).policiesCache();
doReturn(configCacheService).when(pulsar).getConfigurationCache();
@@ -184,8 +182,8 @@ public class ServerCnxTest {
namespaceService = mock(NamespaceService.class);
doReturn(namespaceService).when(pulsar).getNamespaceService();
- doReturn(true).when(namespaceService).isServiceUnitOwned(any(NamespaceBundle.class));
- doReturn(true).when(namespaceService).isServiceUnitActive(any(TopicName.class));
+ doReturn(true).when(namespaceService).isServiceUnitOwned(any());
+ doReturn(true).when(namespaceService).isServiceUnitActive(any());
setupMLAsyncCallbackMocks();
@@ -756,7 +754,7 @@ public class ServerCnxTest {
});
return null;
}).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
- any(OpenLedgerCallback.class), anyObject());
+ any(OpenLedgerCallback.class), any());
// In a create producer timeout from client side we expect to see this sequence of commands :
// 1. create producer
@@ -815,7 +813,7 @@ public class ServerCnxTest {
return null;
}
}).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
- any(OpenLedgerCallback.class), anyObject());
+ any(OpenLedgerCallback.class), any());
// In a create producer timeout from client side we expect to see this sequence of commands :
// 1. create producer
@@ -893,7 +891,7 @@ public class ServerCnxTest {
});
return null;
}).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
- any(OpenLedgerCallback.class), anyObject());
+ any(OpenLedgerCallback.class), any());
// In a create producer timeout from client side we expect to see this sequence of commands :
// 1. create a failure producer which will timeout creation after 100msec
@@ -964,7 +962,7 @@ public class ServerCnxTest {
return null;
}).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
- any(OpenLedgerCallback.class), anyObject());
+ any(OpenLedgerCallback.class), any());
// In a subscribe timeout from client side we expect to see this sequence of commands :
// 1. Subscribe
@@ -1037,7 +1035,7 @@ public class ServerCnxTest {
});
return null;
}).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
- any(OpenLedgerCallback.class), anyObject());
+ any(OpenLedgerCallback.class), any());
CompletableFuture<Runnable> openTopicFail = new CompletableFuture<>();
doAnswer(invocationOnMock -> {
@@ -1047,7 +1045,7 @@ public class ServerCnxTest {
});
return null;
}).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
- any(OpenLedgerCallback.class), anyObject());
+ any(OpenLedgerCallback.class), any());
// In a subscribe timeout from client side we expect to see this sequence of commands :
// 1. Subscribe against failtopic which will fail after 100msec
@@ -1444,7 +1442,7 @@ public class ServerCnxTest {
return null;
}
}).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
- any(OpenLedgerCallback.class), anyObject());
+ any(OpenLedgerCallback.class), any());
// call openLedgerFailed on ML factory asyncOpen
doAnswer(new Answer<Object>() {
@@ -1459,7 +1457,7 @@ public class ServerCnxTest {
return null;
}
}).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
- any(OpenLedgerCallback.class), anyObject());
+ any(OpenLedgerCallback.class), any());
// call addComplete on ledger asyncAddEntry
doAnswer(new Answer<Object>() {
@@ -1469,7 +1467,7 @@ public class ServerCnxTest {
invocationOnMock.getArguments()[2]);
return null;
}
- }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), anyObject());
+ }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), any());
doAnswer(new Answer<Object>() {
@Override
@@ -1478,7 +1476,7 @@ public class ServerCnxTest {
((OpenCursorCallback) invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null);
return null;
}
- }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), anyObject());
+ }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), any());
doAnswer(new Answer<Object>() {
@Override
@@ -1488,7 +1486,7 @@ public class ServerCnxTest {
return null;
}
}).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class),
- any(OpenCursorCallback.class), anyObject());
+ any(OpenCursorCallback.class), any());
doAnswer(new Answer<Object>() {
@Override
@@ -1498,7 +1496,7 @@ public class ServerCnxTest {
.openCursorFailed(new ManagedLedgerException("Managed ledger failure"), null);
return null;
}
- }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(OpenCursorCallback.class), anyObject());
+ }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(OpenCursorCallback.class), any());
doAnswer(new Answer<Object>() {
@Override
@@ -1509,7 +1507,7 @@ public class ServerCnxTest {
return null;
}
}).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(Map.class),
- any(OpenCursorCallback.class), anyObject());
+ any(OpenCursorCallback.class), any());
doAnswer(new Answer<Object>() {
@Override
@@ -1517,7 +1515,7 @@ public class ServerCnxTest {
((DeleteCursorCallback) invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
return null;
}
- }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), anyObject());
+ }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), any());
doAnswer(new Answer<Object>() {
@Override
@@ -1526,7 +1524,7 @@ public class ServerCnxTest {
.deleteCursorFailed(new ManagedLedgerException("Managed ledger failure"), null);
return null;
}
- }).when(ledgerMock).asyncDeleteCursor(matches(".*fail.*"), any(DeleteCursorCallback.class), anyObject());
+ }).when(ledgerMock).asyncDeleteCursor(matches(".*fail.*"), any(DeleteCursorCallback.class), any());
doAnswer(new Answer<Object>() {
@Override
@@ -1534,7 +1532,7 @@ public class ServerCnxTest {
((CloseCallback) invocationOnMock.getArguments()[0]).closeComplete(null);
return null;
}
- }).when(cursorMock).asyncClose(any(CloseCallback.class), anyObject());
+ }).when(cursorMock).asyncClose(any(CloseCallback.class), any());
doReturn(successSubName).when(cursorMock).getName();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
index 9e65326..2d1d092 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
@@ -18,8 +18,30 @@
*/
package org.apache.pulsar.broker.service.persistent;
+import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
+import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
import org.apache.bookkeeper.mledger.AsyncCallbacks;
-import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
@@ -33,8 +55,8 @@ import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.PersistentTopicTest;
-import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
@@ -49,32 +71,6 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
-import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
-import static org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyList;
-import static org.mockito.Matchers.anyMap;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
@PrepareForTest({ ZooKeeperDataCache.class, BrokerService.class })
public class PersistentSubscriptionTest {
@@ -170,14 +166,14 @@ public class PersistentSubscriptionTest {
((AsyncCallbacks.DeleteCallback) invocationOnMock.getArguments()[1])
.deleteComplete(invocationOnMock.getArguments()[2]);
return null;
- }).when(cursorMock).asyncDelete(anyList(), any(AsyncCallbacks.DeleteCallback.class), anyObject());
+ }).when(cursorMock).asyncDelete(any(List.class), any(AsyncCallbacks.DeleteCallback.class), any());
doAnswer((invocationOnMock) -> {
assertTrue(((PositionImpl)invocationOnMock.getArguments()[0]).compareTo(new PositionImpl(3, 100)) == 0);
((AsyncCallbacks.MarkDeleteCallback) invocationOnMock.getArguments()[2])
.markDeleteComplete(invocationOnMock.getArguments()[3]);
return null;
- }).when(cursorMock).asyncMarkDelete(anyObject(), anyObject(), any(AsyncCallbacks.MarkDeleteCallback.class), anyObject());
+ }).when(cursorMock).asyncMarkDelete(any(), any(), any(AsyncCallbacks.MarkDeleteCallback.class), any());
List<Position> positions = new ArrayList<>();
positions.add(new PositionImpl(1, 1));
@@ -197,8 +193,8 @@ public class PersistentSubscriptionTest {
persistentSubscription.commitTxn(txnID1, Collections.emptyMap());
// Verify corresponding ledger method was called with expected args.
- verify(cursorMock, times(1)).asyncDelete(anyList(), any(), any());
- verify(cursorMock, times(1)).asyncMarkDelete(any(), anyMap(), anyObject(), any());
+ verify(cursorMock, times(1)).asyncDelete(any(List.class), any(), any());
+ verify(cursorMock, times(1)).asyncMarkDelete(any(), any(Map.class), any(), any());
}
@Test
@@ -216,7 +212,7 @@ public class PersistentSubscriptionTest {
((AsyncCallbacks.DeleteCallback) invocationOnMock.getArguments()[1])
.deleteComplete(invocationOnMock.getArguments()[2]);
return null;
- }).when(cursorMock).asyncDelete(anyList(), any(AsyncCallbacks.DeleteCallback.class), anyObject());
+ }).when(cursorMock).asyncDelete(any(List.class), any(AsyncCallbacks.DeleteCallback.class), any());
doReturn(PulsarApi.CommandSubscribe.SubType.Exclusive).when(consumerMock).subType();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java
index b4d1521..0d5e351 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.broker.service.persistent;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -67,7 +67,7 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
controller = mock(ReplicatedSubscriptionsController.class);
when(controller.localCluster()).thenReturn(localCluster);
doAnswer(invocation -> {
- ByteBuf marker = invocation.getArgumentAt(0, ByteBuf.class);
+ ByteBuf marker = invocation.getArgument(0, ByteBuf.class);
Commands.skipMessageMetadata(marker);
markers.add(marker);
return null;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidatorTest.java
index aa1cfbb..885a9e9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidatorTest.java
@@ -18,9 +18,9 @@
*/
package org.apache.pulsar.broker.service.schema.validator;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 49206ff..51459fc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -19,7 +19,7 @@
package org.apache.pulsar.client.api;
import static org.apache.bookkeeper.test.PortManager.nextFreePort;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 130a35b..23b6f80 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -19,7 +19,7 @@
package org.apache.pulsar.client.api;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index f437eda..8fdc5ed 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.client.api.v1;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index c89fd20..e0aec98 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.client.impl;
import static java.util.UUID.randomUUID;
import static org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;
-import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
@@ -153,7 +153,7 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
doAnswer(invocationOnMock -> {
cons1.connectionClosed((ClientCnx) invocationOnMock.getArguments()[0]);
return null;
- }).when(consumer1).connectionClosed(anyObject());
+ }).when(consumer1).connectionClosed(any());
ProducerImpl<byte[]> producer1 = spy(prod1);
doAnswer(invocationOnMock -> prod1.getState()).when(producer1).getState();
doAnswer(invocationOnMock -> prod1.getClientCnx()).when(producer1).getClientCnx();
@@ -161,7 +161,7 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
doAnswer(invocationOnMock -> {
prod1.connectionClosed((ClientCnx) invocationOnMock.getArguments()[0]);
return null;
- }).when(producer1).connectionClosed(anyObject());
+ }).when(producer1).connectionClosed(any());
ProducerImpl<byte[]> producer2 = spy(prod2);
doAnswer(invocationOnMock -> prod2.getState()).when(producer2).getState();
doAnswer(invocationOnMock -> prod2.getClientCnx()).when(producer2).getClientCnx();
@@ -169,7 +169,7 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
doAnswer(invocationOnMock -> {
prod2.connectionClosed((ClientCnx) invocationOnMock.getArguments()[0]);
return null;
- }).when(producer2).connectionClosed(anyObject());
+ }).when(producer2).connectionClosed(any());
ClientCnx clientCnx = producer1.getClientCnx();
@@ -200,11 +200,11 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
// let server send signal to close-connection and client close the connection
Thread.sleep(1000);
// [1] Verify: producer1 must get connectionClosed signal
- verify(producer1, atLeastOnce()).connectionClosed(anyObject());
+ verify(producer1, atLeastOnce()).connectionClosed(any());
// [2] Verify: consumer1 must get connectionClosed signal
- verify(consumer1, atLeastOnce()).connectionClosed(anyObject());
+ verify(consumer1, atLeastOnce()).connectionClosed(any());
// [3] Verify: producer2 should have not received connectionClosed signal
- verify(producer2, never()).connectionClosed(anyObject());
+ verify(producer2, never()).connectionClosed(any());
// sleep for sometime to let other disconnected producer and consumer connect again: but they should not get
// connected with same broker as that broker is already out from active-broker list
@@ -224,7 +224,7 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
pulsar.getNamespaceService().unloadNamespaceBundle((NamespaceBundle) bundle2);
// let producer2 give some time to get disconnect signal and get disconencted
Thread.sleep(200);
- verify(producer2, atLeastOnce()).connectionClosed(anyObject());
+ verify(producer2, atLeastOnce()).connectionClosed(any());
// producer1 must not be able to connect again
assertTrue(prod1.getClientCnx() == null);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleTest.java
index 064d357..e7a528c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.common.naming;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java
index a8c504d..7ef0e32 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.common.naming;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -169,7 +169,7 @@ public class NamespaceBundlesTest {
bundles = new NamespaceBundles(topicName.getNamespaceObject(), newPar, factory);
bundles.findBundle(topicName);
fail("Should have failed due to out-of-range");
- } catch (ArrayIndexOutOfBoundsException iae) {
+ } catch (IndexOutOfBoundsException iae) {
// OK, expected
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 1e68f80..5437327 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.compaction;
-import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertNull;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index aba83a7..41782a4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.io;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
index 1ded3c6..bf19a18 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
@@ -18,9 +18,29 @@
*/
package org.apache.kafka.clients.producer;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.anyVararg;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
+
import org.apache.avro.reflect.Nullable;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -35,8 +55,6 @@ import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -45,25 +63,6 @@ import org.testng.IObjectFactory;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.anyVararg;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
@PrepareForTest({PulsarClientKafkaConfig.class, PulsarProducerKafkaConfig.class})
@PowerMockIgnore({"org.apache.logging.log4j.*", "org.apache.kafka.clients.producer.ProducerInterceptor"})
public class PulsarKafkaProducerTest {
@@ -96,26 +95,20 @@ public class PulsarKafkaProducerTest {
public void testPulsarKafkaProducer() {
ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- Assert.assertEquals((int)invocation.getArguments()[0], 1000000, "Send time out is suppose to be 1000.");
- return mockProducerBuilder;
- }
+ doAnswer(invocation -> {
+ Assert.assertEquals((int)invocation.getArguments()[0], 1000000, "Send time out is suppose to be 1000.");
+ return mockProducerBuilder;
}).when(mockProducerBuilder).sendTimeout(anyInt(), any(TimeUnit.class));
doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- Assert.assertEquals((int)invocation.getArguments()[0], 1000, "Keep alive interval is suppose to be 1000.");
- return mockClientBuilder;
- }
+ doAnswer(invocation -> {
+ Assert.assertEquals((int)invocation.getArguments()[0], 1000, "Keep alive interval is suppose to be 1000.");
+ return mockClientBuilder;
}).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));
PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
- when(PulsarProducerKafkaConfig.getProducerBuilder(any(PulsarClient.class), any(Properties.class))).thenReturn(mockProducerBuilder);
+ when(PulsarProducerKafkaConfig.getProducerBuilder(any(), any())).thenReturn(mockProducerBuilder);
Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
index 0f15691..f96c9c7 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
@@ -48,7 +48,7 @@ import org.testng.annotations.Test;
import java.util.Arrays;
import java.util.Random;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index c1e40fb..e3bae43 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -19,9 +19,9 @@
package org.apache.pulsar.admin.cli;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index f028636..a296857 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -19,12 +19,14 @@
package org.apache.pulsar.admin.cli;
import static org.junit.Assert.assertNull;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.longThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
-import static org.mockito.Mockito.times;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -45,10 +47,10 @@ import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.NonPersistentTopics;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.ResourceQuotas;
+import org.apache.pulsar.client.admin.Schemas;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
-import org.apache.pulsar.client.admin.Schemas;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -69,7 +71,6 @@ import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.mockito.ArgumentMatcher;
-import org.mockito.Matchers;
import org.mockito.Mockito;
import org.testng.annotations.Test;
@@ -98,7 +99,7 @@ public class PulsarAdminToolTest {
brokers.run(split("update-dynamic-config --config brokerShutdownTimeoutMs --value 100"));
verify(mockBrokers).updateDynamicConfiguration("brokerShutdownTimeoutMs", "100");
-
+
brokers.run(split("delete-dynamic-config --config brokerShutdownTimeoutMs"));
verify(mockBrokers).deleteDynamicConfiguration("brokerShutdownTimeoutMs");
@@ -288,13 +289,13 @@ public class PulsarAdminToolTest {
.run(split("set-bookie-affinity-group myprop/clust/ns1 --primary-group test1 --secondary-group test2"));
verify(mockNamespaces).setBookieAffinityGroup("myprop/clust/ns1",
new BookieAffinityGroupData("test1", "test2"));
-
+
namespaces.run(split("get-bookie-affinity-group myprop/clust/ns1"));
verify(mockNamespaces).getBookieAffinityGroup("myprop/clust/ns1");
-
+
namespaces.run(split("delete-bookie-affinity-group myprop/clust/ns1"));
verify(mockNamespaces).deleteBookieAffinityGroup("myprop/clust/ns1");
-
+
namespaces.run(split("unload myprop/clust/ns1"));
verify(mockNamespaces).unload("myprop/clust/ns1");
@@ -653,10 +654,9 @@ public class PulsarAdminToolTest {
// argument matcher for the timestamp in reset cursor. Since we can't verify exact timestamp, we check for a
// range of +/- 1 second of the expected timestamp
- class TimestampMatcher extends ArgumentMatcher<Long> {
+ class TimestampMatcher implements ArgumentMatcher<Long> {
@Override
- public boolean matches(Object argument) {
- long timestamp = (Long) argument;
+ public boolean matches(Long timestamp) {
long expectedTimestamp = System.currentTimeMillis() - (1 * 60 * 1000);
if (timestamp < (expectedTimestamp + 1000) && timestamp > (expectedTimestamp - 1000)) {
return true;
@@ -665,8 +665,8 @@ public class PulsarAdminToolTest {
}
}
cmdTopics.run(split("reset-cursor persistent://myprop/clust/ns1/ds1 -s sub1 -t 1m"));
- verify(mockTopics).resetCursor(Matchers.eq("persistent://myprop/clust/ns1/ds1"), Matchers.eq("sub1"),
- Matchers.longThat(new TimestampMatcher()));
+ verify(mockTopics).resetCursor(eq("persistent://myprop/clust/ns1/ds1"), eq("sub1"),
+ longThat(new TimestampMatcher()));
}
@Test
@@ -736,10 +736,9 @@ public class PulsarAdminToolTest {
// argument matcher for the timestamp in reset cursor. Since we can't verify exact timestamp, we check for a
// range of +/- 1 second of the expected timestamp
- class TimestampMatcher extends ArgumentMatcher<Long> {
+ class TimestampMatcher implements ArgumentMatcher<Long> {
@Override
- public boolean matches(Object argument) {
- long timestamp = (Long) argument;
+ public boolean matches(Long timestamp) {
long expectedTimestamp = System.currentTimeMillis() - (1 * 60 * 1000);
if (timestamp < (expectedTimestamp + 1000) && timestamp > (expectedTimestamp - 1000)) {
return true;
@@ -748,11 +747,10 @@ public class PulsarAdminToolTest {
}
}
topics.run(split("reset-cursor persistent://myprop/clust/ns1/ds1 -s sub1 -t 1m"));
- verify(mockTopics).resetCursor(Matchers.eq("persistent://myprop/clust/ns1/ds1"), Matchers.eq("sub1"),
- Matchers.longThat(new TimestampMatcher()));
+ verify(mockTopics).resetCursor(eq("persistent://myprop/clust/ns1/ds1"), eq("sub1"),
+ longThat(new TimestampMatcher()));
}
-
@Test
void nonPersistentTopics() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
@@ -844,7 +842,7 @@ public class PulsarAdminToolTest {
assertNull(atuh.getCertFilePath());
assertNull(atuh.getKeyFilePath());
}
-
+
String[] split(String s) {
return s.split(" ");
}
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index 2a48586..f0418c9 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.admin.cli;
import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
@@ -251,7 +251,7 @@ public class TestCmdSinks {
sinkConfig
);
}
-
+
@Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Sink archive not specfied")
public void testMissingArchive() throws Exception {
SinkConfig sinkConfig = getSinkConfig();
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index da6e99f..cbf121d 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.admin.cli;
import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-import static org.mockito.Matchers.eq;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
@@ -33,16 +33,14 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
import java.io.File;
import java.nio.file.Files;
-import lombok.extern.slf4j.Slf4j;
-
import org.apache.pulsar.admin.cli.utils.CmdUtils;
-import org.apache.pulsar.client.admin.Sources;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.Sources;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.io.SourceConfig;
-import org.apache.pulsar.functions.utils.*;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -52,7 +50,6 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
-@Slf4j
@PrepareForTest({CmdFunctions.class})
@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.core.*" })
public class TestCmdSources {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index dea384f..3398d36 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -156,6 +156,11 @@ public class MessageImpl<T> implements Message<T> {
}
public MessageImpl(String topic, String msgId, Map<String, String> properties,
+ byte[] payload, Schema<T> schema) {
+ this(topic, msgId, properties, Unpooled.wrappedBuffer(payload), schema);
+ }
+
+ public MessageImpl(String topic, String msgId, Map<String, String> properties,
ByteBuf payload, Schema<T> schema) {
String[] data = msgId.split(":");
long ledgerId = Long.parseLong(data[0]);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageRouterTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageRouterTest.java
index 5da9f6e..48f5816 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageRouterTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageRouterTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.client.api;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index c969374..ab73b67 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.client.impl;
-import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertTrue;
@@ -47,8 +47,8 @@ public class ClientCnxTest {
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
ChannelFuture listenerFuture = mock(ChannelFuture.class);
- when(listenerFuture.addListener(anyObject())).thenReturn(listenerFuture);
- when(ctx.writeAndFlush(anyObject())).thenReturn(listenerFuture);
+ when(listenerFuture.addListener(any())).thenReturn(listenerFuture);
+ when(ctx.writeAndFlush(any())).thenReturn(listenerFuture);
Field ctxField = PulsarHandler.class.getDeclaredField("ctx");
ctxField.setAccessible(true);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
index 5475050..e253ace 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
@@ -34,7 +34,7 @@ import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
import org.testng.Assert;
import org.testng.annotations.Test;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;
import static org.testng.Assert.assertEquals;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
index d6877c1..862237a 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.client.impl;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.mockito.Matchers;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
@@ -29,7 +28,8 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertNotNull;
@@ -51,7 +51,7 @@ public class ProducerBuilderImplTest {
when(client.newProducer()).thenReturn(producerBuilderImpl);
when(client.createProducerAsync(
- Matchers.any(ProducerConfigurationData.class), Matchers.any(Schema.class), eq(null)))
+ any(ProducerConfigurationData.class), any(Schema.class), eq(null)))
.thenReturn(CompletableFuture.completedFuture(producer));
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningAvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningAvroSchemaTest.java
index 2ee5797..1b3ef52 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningAvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningAvroSchemaTest.java
@@ -25,7 +25,7 @@ import org.apache.pulsar.common.schema.SchemaInfo;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;
import static org.testng.Assert.assertEquals;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java
index 375baf5..ebcc00d 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java
@@ -26,7 +26,7 @@ import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.testng.Assert;
import org.testng.annotations.Test;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchemaTest.java
index 9aefac4..814975d 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchemaTest.java
@@ -28,9 +28,9 @@ import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
+import static org.mockito.Mockito.when;
public class GenericAvroSchemaTest {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
index 79c2486..74133f4 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
@@ -19,7 +19,7 @@
package org.apache.pulsar.client.impl.schema.generic;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
index ed14580..2e101b1 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
@@ -18,12 +18,14 @@
*/
package org.apache.pulsar.client.impl.schema.generic;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
-import org.apache.pulsar.client.api.schema.GenericSchema;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -34,9 +36,6 @@ import org.apache.pulsar.common.schema.SchemaInfo;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
/**
* Unit test for {@link MultiVersionSchemaInfoProvider}.
*/
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/util/ObjectCacheTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/util/ObjectCacheTest.java
index 771e387..f0c2cbc 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/util/ObjectCacheTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/util/ObjectCacheTest.java
@@ -37,13 +37,14 @@ public class ObjectCacheTest {
AtomicLong currentTime = new AtomicLong(0);
Clock clock = mock(Clock.class);
- when(clock.millis()).then(invocation -> currentTime);
+ when(clock.millis()).then(invocation -> currentTime.longValue());
AtomicInteger currentValue = new AtomicInteger(0);
Supplier<Integer> cache = new ObjectCache<>(() -> currentValue.getAndIncrement(),
10, TimeUnit.MILLISECONDS, clock);
+ cache.get();
assertEquals(cache.get().intValue(), 0);
assertEquals(cache.get().intValue(), 0);
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
index fdbbc9f..f458c5d 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
@@ -28,6 +28,7 @@ import org.testng.annotations.Test;
@Test
public class TopicNameTest {
+ @SuppressWarnings("deprecation")
@Test
void topic() {
try {
@@ -215,6 +216,7 @@ public class TopicNameTest {
assertEquals(name.getPersistenceNamingEncoding(), "prop/colo/ns/persistent/" + encodedName);
}
+ @SuppressWarnings("deprecation")
@Test
public void testTopicNameWithoutCluster() throws Exception {
TopicName topicName = TopicName.get("persistent://tenant/namespace/topic");
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/ByteBufPairTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/ByteBufPairTest.java
index 548b7be..b17f50a 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/ByteBufPairTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/ByteBufPairTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.common.protocol;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/PulsarDecoderTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/PulsarDecoderTest.java
index 7f41e49..89e4871 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/PulsarDecoderTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/PulsarDecoderTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.common.protocol;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
index 7fd48c2..83a7549 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.flink.streaming.connectors.pulsar;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.avro.generated.NasaMission;
@@ -30,7 +31,6 @@ import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.mockito.Mockito;
-import org.mockito.internal.util.reflection.Whitebox;
import org.powermock.api.mockito.PowerMockito;
import org.testng.annotations.Test;
@@ -109,10 +109,10 @@ public class PulsarAvroTableSinkTest {
Mockito.any(SerializationSchema.class),
Mockito.any(PulsarKeyExtractor.class)
).thenReturn(producer);
- Whitebox.setInternalState(sink, "fieldNames", fieldNames);
- Whitebox.setInternalState(sink, "fieldTypes", typeInformations);
- Whitebox.setInternalState(sink, "serializationSchema", Mockito.mock(SerializationSchema.class));
- Whitebox.setInternalState(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class));
+ FieldUtils.writeField(sink, "fieldNames", fieldNames, true);
+ FieldUtils.writeField(sink, "fieldTypes", typeInformations, true);
+ FieldUtils.writeField(sink, "serializationSchema", Mockito.mock(SerializationSchema.class), true);
+ FieldUtils.writeField(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class), true);
return sink;
}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
index b96f971..82d831b 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
@@ -38,7 +38,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
+import io.netty.buffer.Unpooled;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -57,7 +57,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
/**
* Tests for the PulsarConsumerSource. The source supports two operation modes.
@@ -542,7 +542,7 @@ public class PulsarConsumerSourceTests {
private static Message<byte[]> createMessage(String content, String messageId) {
return new MessageImpl<byte[]>("my-topic", messageId, Collections.emptyMap(),
- Unpooled.wrappedBuffer(content.getBytes()), Schema.BYTES);
+ content.getBytes(), Schema.BYTES);
}
private static String createMessageId(long ledgerId, long entryId, long partitionIndex) {
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
index 668a8e5..02462b3 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.flink.streaming.connectors.pulsar;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -29,7 +30,6 @@ import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.mockito.Mockito;
-import org.mockito.internal.util.reflection.Whitebox;
import org.powermock.api.mockito.PowerMockito;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -105,10 +105,11 @@ public class PulsarJsonTableSinkTest {
Mockito.any(SerializationSchema.class),
Mockito.any(PulsarKeyExtractor.class)
).thenReturn(producer);
- Whitebox.setInternalState(sink, "fieldNames", fieldNames);
- Whitebox.setInternalState(sink, "fieldTypes", typeInformations);
- Whitebox.setInternalState(sink, "serializationSchema", Mockito.mock(SerializationSchema.class));
- Whitebox.setInternalState(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class));
+
+ FieldUtils.writeField(sink, "fieldNames", fieldNames, true);
+ FieldUtils.writeField(sink, "fieldTypes", typeInformations, true);
+ FieldUtils.writeField(sink, "serializationSchema", Mockito.mock(SerializationSchema.class), true);
+ FieldUtils.writeField(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class), true);
return sink;
}
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index fd81260..4843362 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -46,7 +46,7 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
private final PulsarSourceConfig pulsarSourceConfig;
private final Map<String, String> properties;
private List<String> inputTopics;
- private List<Consumer<T>> inputConsumers;
+ private List<Consumer<T>> inputConsumers = Collections.emptyList();
private final TopicSchema topicSchema;
public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig, Map<String, String> properties) {
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 43a5649..8ababe4 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -18,7 +18,24 @@
*/
package org.apache.pulsar.functions.instance;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import io.prometheus.client.CollectorRegistry;
+
+import java.nio.ByteBuffer;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
@@ -31,26 +48,10 @@ import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
-import org.mockito.Matchers;
import org.slf4j.Logger;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.nio.ByteBuffer;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
/**
* Unit test {@link ContextImpl}.
*/
@@ -72,7 +73,7 @@ public class ContextImplTest {
logger = mock(Logger.class);
client = mock(PulsarClientImpl.class);
when(client.newProducer()).thenReturn(new ProducerBuilderImpl(client, Schema.BYTES));
- when(client.createProducerAsync(Matchers.any(ProducerConfigurationData.class), Matchers.any(Schema.class), eq(null)))
+ when(client.createProducerAsync(any(ProducerConfigurationData.class), any(), any()))
.thenReturn(CompletableFuture.completedFuture(producer));
when(client.getSchema(anyString())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
when(producer.sendAsync(anyString())).thenReturn(CompletableFuture.completedFuture(null));
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
deleted file mode 100644
index a11062f..0000000
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
+++ /dev/null
@@ -1,1012 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-//package org.apache.pulsar.functions.instance;
-//
-//import static java.nio.charset.StandardCharsets.UTF_8;
-//import static org.mockito.Matchers.any;
-//import static org.mockito.Matchers.anyList;
-//import static org.mockito.Matchers.anyLong;
-//import static org.mockito.Matchers.anyString;
-//import static org.mockito.Matchers.eq;
-//import static org.mockito.Matchers.same;
-//import static org.mockito.Mockito.doAnswer;
-//import static org.mockito.Mockito.doNothing;
-//import static org.mockito.Mockito.mock;
-//import static org.mockito.Mockito.spy;
-//import static org.mockito.Mockito.times;
-//import static org.mockito.Mockito.verify;
-//import static org.mockito.Mockito.when;
-//import static org.testng.Assert.assertEquals;
-//import static org.testng.Assert.assertNull;
-//import static org.testng.Assert.assertSame;
-//import static org.testng.Assert.assertTrue;
-//
-//import io.netty.buffer.ByteBuf;
-//import java.util.Arrays;
-//import java.util.Collections;
-//import java.util.HashMap;
-//import java.util.HashSet;
-//import java.util.LinkedList;
-//import java.util.List;
-//import java.util.Map;
-//import java.util.Set;
-//import java.util.TreeMap;
-//import java.util.concurrent.CompletableFuture;
-//import java.util.concurrent.ExecutorService;
-//import java.util.concurrent.Executors;
-//import java.util.concurrent.LinkedBlockingQueue;
-//import java.util.concurrent.TimeUnit;
-//import lombok.Cleanup;
-//import lombok.Data;
-//import lombok.extern.slf4j.Slf4j;
-//import org.apache.bookkeeper.api.StorageClient;
-//import org.apache.bookkeeper.api.kv.Table;
-//import org.apache.bookkeeper.clients.StorageClientBuilder;
-//import org.apache.bookkeeper.clients.admin.StorageAdminClient;
-//import org.apache.bookkeeper.clients.config.StorageClientSettings;
-//import org.apache.bookkeeper.common.concurrent.FutureUtils;
-//import org.apache.bookkeeper.stream.proto.StreamProperties;
-//import org.apache.commons.lang3.tuple.Pair;
-//import org.apache.pulsar.client.api.Consumer;
-//import org.apache.pulsar.client.api.ConsumerConfiguration;
-//import org.apache.pulsar.client.api.Message;
-//import org.apache.pulsar.client.api.MessageBuilder;
-//import org.apache.pulsar.client.api.MessageId;
-//import org.apache.pulsar.client.api.Producer;
-//import org.apache.pulsar.client.api.ProducerConfiguration;
-//import org.apache.pulsar.client.api.PulsarClient;
-//import org.apache.pulsar.client.impl.MessageIdImpl;
-//import org.apache.pulsar.client.impl.PulsarClientImpl;
-//import org.apache.pulsar.functions.api.Context;
-//import org.apache.pulsar.functions.api.Function;
-//import org.apache.pulsar.functions.api.utils.DefaultSerDe;
-//import org.apache.pulsar.functions.instance.processors.AtLeastOnceProcessor;
-//import org.apache.pulsar.functions.instance.processors.MessageProcessor;
-//import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-//import org.apache.pulsar.functions.proto.Function.FunctionDetails.ProcessingGuarantees;
-//import org.apache.pulsar.functions.utils.Reflections;
-//import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
-//import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
-//import org.apache.pulsar.functions.utils.Utils;
-//import org.powermock.api.mockito.PowerMockito;
-//import org.powermock.core.classloader.annotations.PowerMockIgnore;
-//import org.powermock.core.classloader.annotations.PrepareForTest;
-//import org.powermock.reflect.Whitebox;
-//import org.testng.IObjectFactory;
-//import org.testng.annotations.BeforeMethod;
-//import org.testng.annotations.ObjectFactory;
-//import org.testng.annotations.Test;
-//
-///**
-// * Test the processing logic of a {@link JavaInstanceRunnable}.
-// */
-//@Slf4j
-//@PrepareForTest({ JavaInstanceRunnable.class, StorageClientBuilder.class, MessageBuilder.class, Reflections.class })
-//@PowerMockIgnore({ "javax.management.*", "org.apache.pulsar.common.api.proto.*", "org.apache.logging.log4j.*", "org/apache/pulsar/common/api/proto/PulsarApi*", "org.apache.pulsar.common.util.protobuf.*", "org.apache.pulsar.shade.*" })
-//public class JavaInstanceRunnableProcessTest {
-//
-// @ObjectFactory
-// public IObjectFactory getObjectFactory() {
-// return new org.powermock.modules.testng.PowerMockObjectFactory();
-// }
-//
-// private static class TestFunction implements Function<String, String> {
-// @Override
-// public String process(String input, Context context) throws Exception {
-// return input + "!";
-// }
-// }
-//
-// private static class TestFailureFunction implements Function<String, String> {
-//
-// private int processId2Count = 0;
-//
-// @Override
-// public String process(String input, Context context) throws Exception {
-// int id = Integer.parseInt(input.replace("message-", ""));
-// if (id % 2 == 0) {
-// if (id == 2) {
-// processId2Count++;
-// if (processId2Count > 1) {
-// return input + "!";
-// }
-// }
-// throw new Exception("Failed to process message " + id);
-// }
-// return input + "!";
-// }
-// }
-//
-// private static class TestVoidFunction implements Function<String, Void> {
-//
-// @Override
-// public Void process(String input, Context context) throws Exception {
-// log.info("process input '{}'", input);
-// voidFunctionQueue.put(input);
-// return null;
-// }
-// }
-//
-// @Data
-// private static class ConsumerInstance {
-// private final Consumer consumer;
-// private final ConsumerConfiguration conf;
-// private final TreeMap<MessageId, Message> messages;
-//
-// public ConsumerInstance(Consumer consumer,
-// ConsumerConfiguration conf) {
-// this.consumer = consumer;
-// this.conf = conf;
-// this.messages = new TreeMap<>();
-// }
-//
-// public synchronized int getNumMessages() {
-// return this.messages.size();
-// }
-//
-// public synchronized void addMessage(Message message) {
-// this.messages.put(message.getMessageId(), message);
-// }
-//
-// public synchronized void removeMessage(MessageId msgId) {
-// this.messages.remove(msgId);
-// }
-//
-// public synchronized boolean containMessage(MessageId msgId) {
-// return this.messages.containsKey(msgId);
-// }
-//
-// public synchronized void removeMessagesBefore(MessageId targetMsgId) {
-// Set<MessageId> messagesToRemove = new HashSet<>();
-// messages.forEach((msgId, message) -> {
-// if (msgId.compareTo(targetMsgId) <= 0) {
-// messagesToRemove.add(msgId);
-// }
-// });
-// for (MessageId msgId : messagesToRemove) {
-// messages.remove(msgId);
-// }
-// }
-// }
-//
-// @Data
-// private static class ProducerInstance {
-// private final Producer producer;
-// private final LinkedBlockingQueue<Message> msgQueue;
-// private final List<CompletableFuture<MessageId>> sendFutures;
-//
-// public ProducerInstance(Producer producer,
-// LinkedBlockingQueue<Message> msgQueue) {
-// this.producer = producer;
-// this.msgQueue = msgQueue;
-// this.sendFutures = new LinkedList<>();
-// }
-//
-// public synchronized void addSendFuture(CompletableFuture<MessageId> future) {
-// this.sendFutures.add(future);
-// }
-//
-// }
-//
-//
-// private static final String TEST_STORAGE_SERVICE_URL = "127.0.0.1:4181";
-// private static final LinkedBlockingQueue<String> voidFunctionQueue = new LinkedBlockingQueue<>();
-//
-// private FunctionDetails functionDetails;
-// private InstanceConfig config;
-// private FunctionCacheManager fnCache;
-// private PulsarClient mockClient;
-// private FunctionStats mockFunctionStats;
-//
-// private final Map<Pair<String, String>, ProducerInstance> mockProducers
-// = Collections.synchronizedMap(new HashMap<>());
-// private final Map<Pair<String, String>, ConsumerInstance> mockConsumers
-// = Collections.synchronizedMap(new HashMap<>());
-// private StorageClient mockStorageClient;
-// private Table<ByteBuf, ByteBuf> mockTable;
-//
-// @BeforeMethod
-// public void setup() throws Exception {
-// mockProducers.clear();
-// mockConsumers.clear();
-//
-// functionDetails = FunctionDetails.newBuilder()
-// .setAutoAck(true)
-// .setClassName(TestFunction.class.getName())
-// .addInputs("test-src-topic")
-// .setName("test-function")
-// .setOutput("test-output-topic")
-// .setProcessingGuarantees(ProcessingGuarantees.ATLEAST_ONCE)
-// .setTenant("test-tenant")
-// .setNamespace("test-namespace")
-// .build();
-//
-// config = new InstanceConfig();
-// config.setFunctionId("test-function-id");
-// config.setFunctionVersion("v1");
-// config.setInstanceId("test-instance-id");
-// config.setMaxBufferedTuples(1000);
-// config.setFunctionDetails(functionDetails);
-//
-// mockClient = mock(PulsarClientImpl.class);
-//
-// // mock FunctionCacheManager
-// fnCache = mock(FunctionCacheManager.class);
-// doNothing().when(fnCache).registerFunctionInstance(anyString(), anyString(), anyList(), anyList());
-// doNothing().when(fnCache).unregisterFunctionInstance(anyString(), anyString());
-//
-// ClassLoader clsLoader = JavaInstanceRunnableTest.class.getClassLoader();
-// when(fnCache.getClassLoader(anyString()))
-// .thenReturn(clsLoader);
-//
-// // mock producer & consumer
-// when(mockClient.createProducer(anyString(), any(ProducerConfiguration.class)))
-// .thenAnswer(invocationOnMock -> {
-// String topic = invocationOnMock.getArgumentAt(0, String.class);
-// ProducerConfiguration conf = invocationOnMock.getArgumentAt(1, ProducerConfiguration.class);
-// String producerName = conf.getProducerName();
-//
-// Pair<String, String> pair = Pair.of(topic, producerName);
-// ProducerInstance producerInstance = mockProducers.get(pair);
-// if (null == producerInstance) {
-// Producer producer = mock(Producer.class);
-// LinkedBlockingQueue<Message> msgQueue = new LinkedBlockingQueue<>();
-// final ProducerInstance instance = new ProducerInstance(producer, msgQueue);
-// producerInstance = instance;
-// when(producer.getProducerName())
-// .thenReturn(producerName);
-// when(producer.getTopic())
-// .thenReturn(topic);
-// when(producer.sendAsync(any(Message.class)))
-// .thenAnswer(invocationOnMock1 -> {
-// Message msg = invocationOnMock1.getArgumentAt(0, Message.class);
-// log.info("producer send message {}", msg);
-//
-// CompletableFuture<MessageId> future = new CompletableFuture<>();
-// instance.addSendFuture(future);
-// msgQueue.put(msg);
-// return future;
-// });
-// when(producer.closeAsync()).thenReturn(FutureUtils.Void());
-//
-// mockProducers.put(pair, producerInstance);
-// }
-// return producerInstance.getProducer();
-// });
-// when(mockClient.subscribe(
-// anyString(),
-// anyString(),
-// any(ConsumerConfiguration.class)
-// )).thenAnswer(invocationOnMock -> {
-// String topic = invocationOnMock.getArgumentAt(0, String.class);
-// String subscription = invocationOnMock.getArgumentAt(1, String.class);
-// ConsumerConfiguration conf = invocationOnMock.getArgumentAt(2, ConsumerConfiguration.class);
-//
-// Pair<String, String> pair = Pair.of(topic, subscription);
-// ConsumerInstance consumerInstance = mockConsumers.get(pair);
-// if (null == consumerInstance) {
-// Consumer consumer = mock(Consumer.class);
-//
-// ConsumerInstance instance = new ConsumerInstance(consumer, conf);
-// consumerInstance = instance;
-// when(consumer.getTopic()).thenReturn(topic);
-// when(consumer.getSubscription()).thenReturn(subscription);
-// when(consumer.acknowledgeAsync(any(Message.class)))
-// .thenAnswer(invocationOnMock1 -> {
-// Message msg = invocationOnMock1.getArgumentAt(0, Message.class);
-// log.info("Ack message {} : message id = {}", msg, msg.getMessageId());
-//
-// instance.removeMessage(msg.getMessageId());
-// return FutureUtils.Void();
-// });
-// when(consumer.acknowledgeCumulativeAsync(any(Message.class)))
-// .thenAnswer(invocationOnMock1 -> {
-// Message msg = invocationOnMock1.getArgumentAt(0, Message.class);
-// log.info("Ack message cumulatively message id = {}", msg, msg.getMessageId());
-//
-// instance.removeMessagesBefore(msg.getMessageId());
-// return FutureUtils.Void();
-// });
-// when(consumer.closeAsync())
-// .thenAnswer(invocationOnMock1 -> {
-// mockConsumers.remove(pair, instance);
-// return FutureUtils.Void();
-// });
-// doAnswer(invocationOnMock1 -> {
-// mockConsumers.remove(pair, instance);
-// return null;
-// }).when(consumer).close();
-//
-//
-// mockConsumers.put(pair, consumerInstance);
-// }
-// return consumerInstance.getConsumer();
-// });
-//
-// //
-// // Mock State Store
-// //
-//
-// StorageClientBuilder mockBuilder = mock(StorageClientBuilder.class);
-// when(mockBuilder.withNamespace(anyString())).thenReturn(mockBuilder);
-// when(mockBuilder.withSettings(any(StorageClientSettings.class))).thenReturn(mockBuilder);
-// this.mockStorageClient = mock(StorageClient.class);
-// when(mockBuilder.build()).thenReturn(mockStorageClient);
-// StorageAdminClient adminClient = mock(StorageAdminClient.class);
-// when(mockBuilder.buildAdmin()).thenReturn(adminClient);
-//
-// PowerMockito.mockStatic(StorageClientBuilder.class);
-// PowerMockito.when(
-// StorageClientBuilder.newBuilder()
-// ).thenReturn(mockBuilder);
-//
-// when(adminClient.getStream(anyString(), anyString())).thenReturn(FutureUtils.value(
-// StreamProperties.newBuilder().build()));
-// mockTable = mock(Table.class);
-// when(mockStorageClient.openTable(anyString())).thenReturn(FutureUtils.value(mockTable));
-//
-// //
-// // Mock Function Stats
-// //
-//
-// mockFunctionStats = spy(new FunctionStats());
-// PowerMockito.whenNew(FunctionStats.class)
-// .withNoArguments()
-// .thenReturn(mockFunctionStats);
-//
-// // Mock message builder
-// PowerMockito.mockStatic(MessageBuilder.class);
-// PowerMockito.when(MessageBuilder.create())
-// .thenAnswer(invocationOnMock -> {
-//
-// Message msg = mock(Message.class);
-// MessageBuilder builder = mock(MessageBuilder.class);
-// when(builder.setContent(any(byte[].class)))
-// .thenAnswer(invocationOnMock1 -> {
-// byte[] content = invocationOnMock1.getArgumentAt(0, byte[].class);
-// when(msg.getData()).thenReturn(content);
-// return builder;
-// });
-// when(builder.setSequenceId(anyLong()))
-// .thenAnswer(invocationOnMock1 -> {
-// long seqId = invocationOnMock1.getArgumentAt(0, long.class);
-// when(msg.getSequenceId()).thenReturn(seqId);
-// return builder;
-// });
-// when(builder.setProperty(anyString(), anyString()))
-// .thenAnswer(invocationOnMock1 -> {
-// String key = invocationOnMock1.getArgumentAt(0, String.class);
-// String value = invocationOnMock1.getArgumentAt(1, String.class);
-// when(msg.getProperty(eq(key))).thenReturn(value);
-// return builder;
-// });
-// when(builder.build()).thenReturn(msg);
-// return builder;
-// });
-// }
-//
-// /**
-// * Test the basic run logic of instance.
-// */
-// @Test
-// public void testSetupJavaInstance() throws Exception {
-// JavaInstanceRunnable runnable = new JavaInstanceRunnable(
-// config,
-// fnCache,
-// "test-jar-file",
-// mockClient,
-// TEST_STORAGE_SERVICE_URL);
-//
-// runnable.setupJavaInstance();
-//
-// // verify
-//
-// // 1. verify jar is loaded
-// verify(fnCache, times(1))
-// .registerFunctionInstance(
-// eq(config.getFunctionId()),
-// eq(config.getInstanceId()),
-// eq(Arrays.asList("test-jar-file")),
-// eq(Collections.emptyList())
-// );
-// verify(fnCache, times(1))
-// .getClassLoader(eq(config.getFunctionId()));
-//
-// // 2. verify serde is setup
-// for (String inputTopic : functionDetails.getInputsList()) {
-// assertTrue(runnable.getInputSerDe().containsKey(inputTopic));
-// assertTrue(runnable.getInputSerDe().get(inputTopic) instanceof DefaultSerDe);
-// DefaultSerDe serDe = (DefaultSerDe) runnable.getInputSerDe().get(inputTopic);
-// assertEquals(String.class, Whitebox.getInternalState(serDe, "type"));
-// }
-//
-// // 3. verify producers and consumers are setup
-// MessageProcessor processor = runnable.getProcessor();
-// assertTrue(processor instanceof AtLeastOnceProcessor);
-// assertSame(mockProducers.get(Pair.of(
-// functionDetails.getOutput(),
-// null
-// )).getProducer(), ((AtLeastOnceProcessor) processor).getProducer());
-//
-// assertEquals(mockConsumers.size(), processor.getInputConsumers().size());
-// for (Map.Entry<String, Consumer> consumerEntry : processor.getInputConsumers().entrySet()) {
-// String topic = consumerEntry.getKey();
-//
-// Consumer mockConsumer = mockConsumers.get(Pair.of(
-// topic,
-// FunctionDetailsUtils.getFullyQualifiedName(functionDetails))).getConsumer();
-// assertSame(mockConsumer, consumerEntry.getValue());
-// }
-//
-// // 4. verify state table
-// assertSame(mockStorageClient, runnable.getStorageClient());
-// assertSame(mockTable, runnable.getStateTable());
-//
-// runnable.close();
-//
-// // verify close
-// for (ConsumerInstance consumer : mockConsumers.values()) {
-// verify(consumer.getConsumer(), times(1)).close();
-// }
-// assertTrue(processor.getInputConsumers().isEmpty());
-//
-// for (ProducerInstance producer : mockProducers.values()) {
-// verify(producer.getProducer(), times(1)).close();
-// }
-//
-// verify(mockTable, times(1)).close();
-// verify(mockStorageClient, times(1)).close();
-//
-// // function is unregistered
-// verify(fnCache, times(1)).unregisterFunctionInstance(
-// eq(config.getFunctionId()), eq(config.getInstanceId()));
-//
-// }
-//
-// @Test
-// public void testAtMostOnceProcessing() throws Exception {
-// FunctionDetails newFunctionDetails = FunctionDetails.newBuilder(functionDetails)
-// .setProcessingGuarantees(ProcessingGuarantees.ATMOST_ONCE)
-// .build();
-// config.setFunctionDetails(newFunctionDetails);
-//
-// @Cleanup("shutdown")
-// ExecutorService executorService = Executors.newSingleThreadExecutor();
-//
-// try (JavaInstanceRunnable runnable = new JavaInstanceRunnable(
-// config,
-// fnCache,
-// "test-jar-file",
-// mockClient,
-// null)) {
-//
-// executorService.submit(runnable);
-//
-// Pair<String, String> consumerId = Pair.of(
-// newFunctionDetails.getInputs(0),
-// FunctionDetailsUtils.getFullyQualifiedName(newFunctionDetails));
-// ConsumerInstance consumerInstance = mockConsumers.get(consumerId);
-// while (null == consumerInstance) {
-// TimeUnit.MILLISECONDS.sleep(20);
-// consumerInstance = mockConsumers.get(consumerId);
-// }
-//
-// ProducerInstance producerInstance = mockProducers.values().iterator().next();
-//
-// // once we get consumer id, simulate receiving 10 messages from consumer
-// for (int i = 0; i < 10; i++) {
-// Message msg = mock(Message.class);
-// when(msg.getData()).thenReturn(("message-" + i).getBytes(UTF_8));
-// when(msg.getMessageId())
-// .thenReturn(new MessageIdImpl(1L, i, 0));
-// consumerInstance.addMessage(msg);
-// consumerInstance.getConf().getMessageListener()
-// .received(consumerInstance.getConsumer(), msg);
-// }
-//
-// // wait until all the messages are published
-// for (int i = 0; i < 10; i++) {
-// Message msg = producerInstance.msgQueue.take();
-//
-// assertEquals("message-" + i + "!", new String(msg.getData(), UTF_8));
-// // sequence id is not set for AT_MOST_ONCE processing
-// assertEquals(0L, msg.getSequenceId());
-// }
-//
-// // verify acknowledge before send completes
-// verify(consumerInstance.getConsumer(), times(10))
-// .acknowledgeAsync(any(Message.class));
-// assertEquals(0, consumerInstance.getNumMessages());
-//
-// // complete all the publishes
-// synchronized (producerInstance) {
-// for (CompletableFuture<MessageId> future : producerInstance.sendFutures) {
-// future.complete(mock(MessageId.class));
-// }
-// }
-//
-// // acknowledges count should remain same
-// verify(consumerInstance.getConsumer(), times(10))
-// .acknowledgeAsync(any(Message.class));
-// }
-// }
-//
-// @Test
-// public void testAtMostOnceProcessingFailures() throws Exception {
-// FunctionDetails newFunctionDetails = FunctionDetails.newBuilder(functionDetails)
-// .setProcessingGuarantees(ProcessingGuarantees.ATMOST_ONCE)
-// .setClassName(TestFailureFunction.class.getName())
-// .build();
-// config.setFunctionDetails(newFunctionDetails);
-//
-// @Cleanup("shutdown")
-// ExecutorService executorService = Executors.newSingleThreadExecutor();
-//
-// try (JavaInstanceRunnable runnable = new JavaInstanceRunnable(
-// config,
-// fnCache,
-// "test-jar-file",
-// mockClient,
-// null)) {
-//
-// executorService.submit(runnable);
-//
-// Pair<String, String> consumerId = Pair.of(
-// newFunctionDetails.getInputs(0),
-// FunctionDetailsUtils.getFullyQualifiedName(newFunctionDetails));
-// ConsumerInstance consumerInstance = mockConsumers.get(consumerId);
-// while (null == consumerInstance) {
-// TimeUnit.MILLISECONDS.sleep(20);
-// consumerInstance = mockConsumers.get(consumerId);
-// }
-//
-// ProducerInstance producerInstance = mockProducers.values().iterator().next();
-//
-// // once we get consumer id, simulate receiving 10 messages from consumer
-// for (int i = 0; i < 10; i++) {
-// Message msg = mock(Message.class);
-// when(msg.getData()).thenReturn(("message-" + i).getBytes(UTF_8));
-// when(msg.getMessageId())
-// .thenReturn(new MessageIdImpl(1L, i, 0));
-// consumerInstance.addMessage(msg);
-// consumerInstance.getConf().getMessageListener()
-// .received(consumerInstance.getConsumer(), msg);
-// }
-//
-// // wait until all the messages are published
-// for (int i = 0; i < 10; i++) {
-// if (i % 2 == 0) { // all messages (i % 2 == 0) will fail to process.
-// continue;
-// }
-//
-// Message msg = producerInstance.msgQueue.take();
-//
-// assertEquals("message-" + i + "!", new String(msg.getData(), UTF_8));
-// // sequence id is not set for AT_MOST_ONCE processing
-// assertEquals(0L, msg.getSequenceId());
-// }
-//
-// // verify acknowledge before send completes
-// verify(consumerInstance.getConsumer(), times(10))
-// .acknowledgeAsync(any(Message.class));
-// assertEquals(0, consumerInstance.getNumMessages());
-//
-// // complete all the publishes
-// synchronized (producerInstance) {
-// for (CompletableFuture<MessageId> future : producerInstance.sendFutures) {
-// future.complete(mock(MessageId.class));
-// }
-// }
-//
-// // acknowledges count should remain same
-// verify(consumerInstance.getConsumer(), times(10))
-// .acknowledgeAsync(any(Message.class));
-// assertEquals(0, consumerInstance.getNumMessages());
-// }
-// }
-//
-// @Test
-// public void testAtLeastOnceProcessing() throws Exception {
-// FunctionDetails newFunctionDetails = FunctionDetails.newBuilder(functionDetails)
-// .setProcessingGuarantees(ProcessingGuarantees.ATLEAST_ONCE)
-// .build();
-// config.setFunctionDetails(newFunctionDetails);
-//
-// @Cleanup("shutdown")
-// ExecutorService executorService = Executors.newSingleThreadExecutor();
-//
-// try (JavaInstanceRunnable runnable = new JavaInstanceRunnable(
-// config,
-// fnCache,
-// "test-jar-file",
-// mockClient,
-// null)) {
-//
-// executorService.submit(runnable);
-//
-// Pair<String, String> consumerId = Pair.of(
-// newFunctionDetails.getInputs(0),
-// FunctionDetailsUtils.getFullyQualifiedName(newFunctionDetails));
-// ConsumerInstance consumerInstance = mockConsumers.get(consumerId);
-// while (null == consumerInstance) {
-// TimeUnit.MILLISECONDS.sleep(20);
-// consumerInstance = mockConsumers.get(consumerId);
-// }
-//
-// ProducerInstance producerInstance = mockProducers.values().iterator().next();
-//
-// // once we get consumer id, simulate receiving 10 messages from consumer
-// for (int i = 0; i < 10; i++) {
-// Message msg = mock(Message.class);
-// when(msg.getData()).thenReturn(("message-" + i).getBytes(UTF_8));
-// when(msg.getMessageId())
-// .thenReturn(new MessageIdImpl(1L, i, 0));
-// consumerInstance.addMessage(msg);
-// consumerInstance.getConf().getMessageListener()
-// .received(consumerInstance.getConsumer(), msg);
-// }
-//
-// // wait until all the messages are published
-// for (int i = 0; i < 10; i++) {
-// Message msg = producerInstance.msgQueue.take();
-//
-// assertEquals("message-" + i + "!", new String(msg.getData(), UTF_8));
-// // sequence id is not set for AT_MOST_ONCE processing
-// assertEquals(0L, msg.getSequenceId());
-// }
-//
-// // verify acknowledge before send completes
-// verify(consumerInstance.getConsumer(), times(0))
-// .acknowledgeAsync(any(Message.class));
-// assertEquals(10, consumerInstance.getNumMessages());
-//
-// // complete all the publishes
-// synchronized (producerInstance) {
-// for (CompletableFuture<MessageId> future : producerInstance.sendFutures) {
-// future.complete(mock(MessageId.class));
-// }
-// }
-//
-// // acknowledges count should remain same
-// verify(consumerInstance.getConsumer(), times(10))
-// .acknowledgeAsync(any(Message.class));
-// assertEquals(0, consumerInstance.getNumMessages());
-// }
-// }
-//
-// @Test
-// public void testAtLeastOnceProcessingFailures() throws Exception {
-// FunctionDetails newFunctionDetails = FunctionDetails.newBuilder(functionDetails)
-// .setProcessingGuarantees(ProcessingGuarantees.ATLEAST_ONCE)
-// .setClassName(TestFailureFunction.class.getName())
-// .build();
-// config.setFunctionDetails(newFunctionDetails);
-//
-// @Cleanup("shutdown")
-// ExecutorService executorService = Executors.newSingleThreadExecutor();
-//
-// try (JavaInstanceRunnable runnable = new JavaInstanceRunnable(
-// config,
-// fnCache,
-// "test-jar-file",
-// mockClient,
-// null)) {
-//
-// executorService.submit(runnable);
-//
-// Pair<String, String> consumerId = Pair.of(
-// newFunctionDetails.getInputs(0),
-// FunctionDetailsUtils.getFullyQualifiedName(newFunctionDetails));
-// ConsumerInstance consumerInstance = mockConsumers.get(consumerId);
-// while (null == consumerInstance) {
-// TimeUnit.MILLISECONDS.sleep(20);
-// consumerInstance = mockConsumers.get(consumerId);
-// }
-//
-// ProducerInstance producerInstance = mockProducers.values().iterator().next();
-//
-// // once we get consumer id, simulate receiving 10 messages from consumer
-// for (int i = 0; i < 10; i++) {
-// Message msg = mock(Message.class);
-// when(msg.getData()).thenReturn(("message-" + i).getBytes(UTF_8));
-// when(msg.getMessageId())
-// .thenReturn(new MessageIdImpl(1L, i, 0));
-// consumerInstance.addMessage(msg);
-// consumerInstance.getConf().getMessageListener()
-// .received(consumerInstance.getConsumer(), msg);
-// }
-//
-// // wait until all the messages are published
-// for (int i = 0; i < 10; i++) {
-// if (i % 2 == 0) { // all messages (i % 2 == 0) will fail to process.
-// continue;
-// }
-//
-// Message msg = producerInstance.msgQueue.take();
-//
-// assertEquals("message-" + i + "!", new String(msg.getData(), UTF_8));
-// // sequence id is not set for AT_MOST_ONCE processing
-// assertEquals(0L, msg.getSequenceId());
-// }
-//
-// // verify acknowledge before send completes
-// verify(consumerInstance.getConsumer(), times(0))
-// .acknowledgeAsync(any(Message.class));
-// assertEquals(10, consumerInstance.getNumMessages());
-//
-// // complete all the publishes
-// synchronized (producerInstance) {
-// for (CompletableFuture<MessageId> future : producerInstance.sendFutures) {
-// future.complete(mock(MessageId.class));
-// }
-// }
-//
-// // only 5 succeed messages are acknowledged
-// verify(consumerInstance.getConsumer(), times(5))
-// .acknowledgeAsync(any(Message.class));
-// assertEquals(5, consumerInstance.getNumMessages());
-// for (int i = 0; i < 10; i++) {
-// assertEquals(
-// i % 2 == 0,
-// consumerInstance.containMessage(new MessageIdImpl(1L, i, 0)));
-// }
-// }
-// }
-//
-// @Test
-// public void testEffectivelyOnceProcessing() throws Exception {
-// FunctionDetails newFunctionDetails = FunctionDetails.newBuilder(functionDetails)
-// .setProcessingGuarantees(ProcessingGuarantees.EFFECTIVELY_ONCE)
-// .build();
-// config.setFunctionDetails(newFunctionDetails);
-//
-// @Cleanup("shutdown")
-// ExecutorService executorService = Executors.newSingleThreadExecutor();
-//
-// try (JavaInstanceRunnable runnable = new JavaInstanceRunnable(
-// config,
-// fnCache,
-// "test-jar-file",
-// mockClient,
-// null)) {
-//
-// executorService.submit(runnable);
-//
-// Pair<String, String> consumerId = Pair.of(
-// newFunctionDetails.getInputs(0),
-// FunctionDetailsUtils.getFullyQualifiedName(newFunctionDetails));
-// ConsumerInstance consumerInstance = mockConsumers.get(consumerId);
-// while (null == consumerInstance) {
-// TimeUnit.MILLISECONDS.sleep(20);
-// consumerInstance = mockConsumers.get(consumerId);
-// }
-//
-// // once we get consumer id, simulate receiving 10 messages from consumer
-// for (int i = 0; i < 10; i++) {
-// Message msg = mock(Message.class);
-// when(msg.getData()).thenReturn(("message-" + i).getBytes(UTF_8));
-// when(msg.getMessageId())
-// .thenReturn(new MessageIdImpl(1L, i, 0));
-// consumerInstance.addMessage(msg);
-// consumerInstance.getConf().getMessageListener()
-// .received(consumerInstance.getConsumer(), msg);
-// }
-//
-// ProducerInstance producerInstance;
-// while (mockProducers.isEmpty()) {
-// TimeUnit.MILLISECONDS.sleep(20);
-// }
-// producerInstance = mockProducers.values().iterator().next();
-//
-// // wait until all the messages are published
-// for (int i = 0; i < 10; i++) {
-// Message msg = producerInstance.msgQueue.take();
-//
-// assertEquals("message-" + i + "!", new String(msg.getData(), UTF_8));
-// // sequence id is not set for AT_MOST_ONCE processing
-// assertEquals(
-// Utils.getSequenceId(
-// new MessageIdImpl(1L, i, 0)),
-// msg.getSequenceId());
-// }
-//
-// // verify acknowledge before send completes
-// verify(consumerInstance.getConsumer(), times(0))
-// .acknowledgeCumulativeAsync(any(Message.class));
-// assertEquals(10, consumerInstance.getNumMessages());
-//
-// // complete all the publishes
-// synchronized (producerInstance) {
-// for (CompletableFuture<MessageId> future : producerInstance.sendFutures) {
-// future.complete(mock(MessageId.class));
-// }
-// }
-//
-// // acknowledges count should remain same
-// verify(consumerInstance.getConsumer(), times(10))
-// .acknowledgeCumulativeAsync(any(Message.class));
-// assertEquals(0, consumerInstance.getNumMessages());
-// }
-// }
-//
-// @Test
-// public void testEffectivelyOnceProcessingFailures() throws Exception {
-// FunctionDetails newFunctionDetails = FunctionDetails.newBuilder(functionDetails)
-// .setProcessingGuarantees(ProcessingGuarantees.EFFECTIVELY_ONCE)
-// .setClassName(TestFailureFunction.class.getName())
-// .build();
-// config.setFunctionDetails(newFunctionDetails);
-//
-// @Cleanup("shutdown")
-// ExecutorService executorService = Executors.newSingleThreadExecutor();
-//
-// try (JavaInstanceRunnable runnable = new JavaInstanceRunnable(
-// config,
-// fnCache,
-// "test-jar-file",
-// mockClient,
-// null)) {
-//
-// executorService.submit(runnable);
-//
-// Pair<String, String> consumerId = Pair.of(
-// newFunctionDetails.getInputs(0),
-// FunctionDetailsUtils.getFullyQualifiedName(newFunctionDetails));
-// ConsumerInstance consumerInstance = mockConsumers.get(consumerId);
-// while (null == consumerInstance) {
-// TimeUnit.MILLISECONDS.sleep(20);
-// consumerInstance = mockConsumers.get(consumerId);
-// }
-//
-// // once we get consumer id, simulate receiving 2 messages from consumer
-// Message[] msgs = new Message[2];
-// for (int i = 1; i <= 2; i++) {
-// Message msg = mock(Message.class);
-// when(msg.getData()).thenReturn(("message-" + i).getBytes(UTF_8));
-// when(msg.getMessageId())
-// .thenReturn(new MessageIdImpl(1L, i, 0));
-//
-// msgs[i-1] = msg;
-//
-// consumerInstance.addMessage(msg);
-// consumerInstance.getConf().getMessageListener()
-// .received(consumerInstance.getConsumer(), msg);
-// }
-//
-// ProducerInstance producerInstance;
-// while (mockProducers.isEmpty()) {
-// TimeUnit.MILLISECONDS.sleep(20);
-// }
-// producerInstance = mockProducers.values().iterator().next();
-//
-// // only first message is published, the second message is not
-// Message msg = producerInstance.msgQueue.take();
-// assertEquals("message-1!", new String(msg.getData(), UTF_8));
-// assertEquals(
-// Utils.getSequenceId(
-// new MessageIdImpl(1L, 1, 0)),
-// msg.getSequenceId());
-// assertNull(producerInstance.msgQueue.poll());
-//
-// // the first result message is sent but the send future is not completed yet
-// // so no acknowledge would happen
-// verify(consumerInstance.getConsumer(), times(0))
-// .acknowledgeCumulativeAsync(any(Message.class));
-//
-// // since the second message failed to process, for correctness, the instance
-// // will close the existing consumer and resubscribe
-// ConsumerInstance secondInstance = mockConsumers.get(consumerId);
-// while (null == secondInstance || secondInstance == consumerInstance) {
-// TimeUnit.MILLISECONDS.sleep(20);
-// secondInstance = mockConsumers.get(consumerId);
-// }
-//
-// Message secondMsg = mock(Message.class);
-// when(secondMsg.getData()).thenReturn("message-2".getBytes(UTF_8));
-// when(secondMsg.getMessageId())
-// .thenReturn(new MessageIdImpl(1L, 2, 0));
-// secondInstance.addMessage(secondMsg);
-// secondInstance.getConf().getMessageListener()
-// .received(secondInstance.getConsumer(), secondMsg);
-//
-// Message secondReceivedMsg = producerInstance.msgQueue.take();
-// assertEquals("message-2!", new String(secondReceivedMsg.getData(), UTF_8));
-// assertEquals(
-// Utils.getSequenceId(
-// new MessageIdImpl(1L, 2, 0)),
-// secondReceivedMsg.getSequenceId());
-//
-// // the first result message is sent
-// verify(secondInstance.getConsumer(), times(0))
-// .acknowledgeCumulativeAsync(any(Message.class));
-//
-// // complete all the publishes
-// synchronized (producerInstance) {
-// assertEquals(2, producerInstance.sendFutures.size());
-// for (CompletableFuture<MessageId> future : producerInstance.sendFutures) {
-// future.complete(mock(MessageId.class));
-// }
-// }
-//
-// // all 2 messages are sent
-// verify(consumerInstance.getConsumer(), times(1))
-// .acknowledgeCumulativeAsync(same(msgs[0]));
-// verify(consumerInstance.getConsumer(), times(0))
-// .acknowledgeCumulativeAsync(same(msgs[1]));
-// verify(consumerInstance.getConsumer(), times(0))
-// .acknowledgeCumulativeAsync(same(secondMsg));
-// verify(secondInstance.getConsumer(), times(0))
-// .acknowledgeCumulativeAsync(same(msgs[0]));
-// verify(secondInstance.getConsumer(), times(0))
-// .acknowledgeCumulativeAsync(same(msgs[1]));
-// }
-// }
-//
-// @Test
-// public void testVoidFunction() throws Exception {
-// FunctionDetails newFunctionDetails = FunctionDetails.newBuilder(functionDetails)
-// .setProcessingGuarantees(ProcessingGuarantees.ATLEAST_ONCE)
-// .setClassName(TestVoidFunction.class.getName())
-// .build();
-// config.setFunctionDetails(newFunctionDetails);
-//
-// @Cleanup("shutdown")
-// ExecutorService executorService = Executors.newSingleThreadExecutor();
-//
-// try (JavaInstanceRunnable runnable = new JavaInstanceRunnable(
-// config,
-// fnCache,
-// "test-jar-file",
-// mockClient,
-// null)) {
-//
-// executorService.submit(runnable);
-//
-// Pair<String, String> consumerId = Pair.of(
-// newFunctionDetails.getInputs(0),
-// FunctionDetailsUtils.getFullyQualifiedName(newFunctionDetails));
-// ConsumerInstance consumerInstance = mockConsumers.get(consumerId);
-// while (null == consumerInstance) {
-// TimeUnit.MILLISECONDS.sleep(20);
-// consumerInstance = mockConsumers.get(consumerId);
-// }
-//
-// // once we get consumer id, simulate receiving 10 messages from consumer
-// for (int i = 0; i < 10; i++) {
-// Message msg = mock(Message.class);
-// when(msg.getData()).thenReturn(("message-" + i).getBytes(UTF_8));
-// when(msg.getMessageId())
-// .thenReturn(new MessageIdImpl(1L, i, 0));
-// consumerInstance.addMessage(msg);
-// consumerInstance.getConf().getMessageListener()
-// .received(consumerInstance.getConsumer(), msg);
-// }
-//
-// // wait until all the messages are published
-// for (int i = 0; i < 10; i++) {
-// String msg = voidFunctionQueue.take();
-// log.info("Processed message {}", msg);
-// assertEquals("message-" + i, msg);
-// }
-//
-// // no producer should be initialized
-// assertTrue(mockProducers.isEmpty());
-// }
-// }
-//}
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java
index 2805a3d..04d0482 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java
@@ -28,9 +28,9 @@ import org.testng.annotations.Test;
import java.nio.ByteBuffer;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index 1ef72c7..700267e 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -18,9 +18,30 @@
*/
package org.apache.pulsar.functions.sink;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
+
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.MessageId;
@@ -38,31 +59,10 @@ import org.apache.pulsar.functions.instance.SinkRecord;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.io.core.SinkContext;
-import org.mockito.ArgumentMatcher;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyList;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.testng.AssertJUnit.assertEquals;
-import static org.testng.AssertJUnit.assertTrue;
-import static org.testng.AssertJUnit.fail;
-
@Slf4j
public class PulsarSinkTest {
@@ -109,7 +109,7 @@ public class PulsarSinkTest {
doReturn(producerBuilder).when(producerBuilder).property(anyString(), anyString());
doReturn(producerBuilder).when(producerBuilder).properties(any());
doReturn(producerBuilder).when(producerBuilder).sendTimeout(anyInt(), any());
-
+
CompletableFuture completableFuture = new CompletableFuture<>();
completableFuture.complete(mock(MessageId.class));
TypedMessageBuilder typedMessageBuilder = mock(TypedMessageBuilder.class);
@@ -297,14 +297,11 @@ public class PulsarSinkTest {
} else {
Assert.assertTrue(pulsarSinkAtLeastOnceProcessor.publishProducers.containsKey(defaultTopic));
}
- verify(pulsarClient.newProducer(), times(1)).topic(argThat(new ArgumentMatcher<String>() {
-
- @Override
- public boolean matches(Object o) {
- if (o instanceof String) {
- return getTopicEquals(o, topic, defaultTopic);
- }
- return false;
+ verify(pulsarClient.newProducer(), times(1)).topic(argThat(otherTopic -> {
+ if (topic != null) {
+ return topic.equals(otherTopic);
+ } else {
+ return defaultTopic.equals(otherTopic);
}
}));
}
@@ -346,15 +343,8 @@ public class PulsarSinkTest {
} else {
Assert.assertTrue(pulsarSinkAtLeastOnceProcessor.publishProducers.containsKey(defaultTopic));
}
- verify(pulsarClient.newProducer(), times(1)).topic(argThat(new ArgumentMatcher<String>() {
-
- @Override
- public boolean matches(Object o) {
- if (o instanceof String) {
- return getTopicEquals(o, topic, defaultTopic);
- }
- return false;
- }
+ verify(pulsarClient.newProducer(), times(1)).topic(argThat(o -> {
+ return getTopicEquals(o, topic, defaultTopic);
}));
}
@@ -408,28 +398,15 @@ public class PulsarSinkTest {
} else {
Assert.assertTrue(pulsarSinkEffectivelyOnceProcessor.publishProducers.containsKey(String.format("%s-%s-id-1", defaultTopic, defaultTopic)));
}
- verify(pulsarClient.newProducer(), times(1)).topic(argThat(new ArgumentMatcher<String>() {
- @Override
- public boolean matches(Object o) {
- if (o instanceof String) {
- return getTopicEquals(o, topic, defaultTopic);
- }
- return false;
- }
+ verify(pulsarClient.newProducer(), times(1)).topic(argThat(o -> {
+ return getTopicEquals(o, topic, defaultTopic);
}));
- verify(pulsarClient.newProducer(), times(1)).producerName(argThat(new ArgumentMatcher<String>() {
-
- @Override
- public boolean matches(Object o) {
- if (o instanceof String) {
- if (topic != null) {
- return String.format("%s-id-1", topic).equals(o);
- } else {
- return String.format("%s-id-1", defaultTopic).equals(o);
- }
- }
- return false;
+ verify(pulsarClient.newProducer(), times(1)).producerName(argThat(o -> {
+ if (topic != null) {
+ return String.format("%s-id-1", topic).equals(o);
+ } else {
+ return String.format("%s-id-1", defaultTopic).equals(o);
}
}));
}
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index 43b4bcb..4438bf0 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -18,10 +18,11 @@
*/
package org.apache.pulsar.functions.source;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyList;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyString;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.testng.AssertJUnit.assertEquals;
@@ -29,12 +30,12 @@ import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertTrue;
import static org.testng.AssertJUnit.fail;
-import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import lombok.Cleanup;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@@ -43,9 +44,9 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.io.core.SourceContext;
import org.testng.annotations.Test;
@@ -76,14 +77,14 @@ public class PulsarSourceTest {
*/
private static PulsarClientImpl getPulsarClient() throws PulsarClientException {
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
- ConsumerBuilder consumerBuilder = mock(ConsumerBuilder.class);
+ ConsumerBuilder<?> consumerBuilder = mock(ConsumerBuilder.class);
doReturn(consumerBuilder).when(consumerBuilder).topics(anyList());
doReturn(consumerBuilder).when(consumerBuilder).cryptoFailureAction(any());
- doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(anyString());
+ doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(any());
doReturn(consumerBuilder).when(consumerBuilder).subscriptionType(any());
doReturn(consumerBuilder).when(consumerBuilder).ackTimeout(anyLong(), any());
doReturn(consumerBuilder).when(consumerBuilder).messageListener(any());
- Consumer consumer = mock(Consumer.class);
+ Consumer<?> consumer = mock(Consumer.class);
doReturn(consumer).when(consumerBuilder).subscribe();
doReturn(consumerBuilder).when(pulsarClient).newConsumer(any());
doReturn(CompletableFuture.completedFuture(consumer)).when(consumerBuilder).subscribeAsync();
@@ -120,11 +121,13 @@ public class PulsarSourceTest {
@Test
- public void testVoidInputClasses() throws IOException {
+ public void testVoidInputClasses() throws Exception {
PulsarSourceConfig pulsarConfig = getPulsarConfigs();
// set type to void
pulsarConfig.setTypeClassName(Void.class.getName());
- PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>());
+
+ @Cleanup
+ PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>());
try {
pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
@@ -142,7 +145,7 @@ public class PulsarSourceTest {
* Verify that function input type should be consistent with input serde type.
*/
@Test
- public void testInconsistentInputType() throws IOException {
+ public void testInconsistentInputType() throws Exception {
PulsarSourceConfig pulsarConfig = getPulsarConfigs();
// set type to be inconsistent to that of SerDe
pulsarConfig.setTypeClassName(Integer.class.getName());
@@ -150,7 +153,9 @@ public class PulsarSourceTest {
topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result",
ConsumerConfig.builder().serdeClassName(TestSerDe.class.getName()).build());
pulsarConfig.setTopicSchema(topicSerdeClassNameMap);
- PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>());
+
+ @Cleanup
+ PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>());
try {
pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
fail("Should fail constructing java instance if function type is inconsistent with serde type");
@@ -175,7 +180,9 @@ public class PulsarSourceTest {
consumerConfigs.put("persistent://sample/standalone/ns1/test_result",
ConsumerConfig.builder().serdeClassName(TopicSchema.DEFAULT_SERDE).build());
pulsarConfig.setTopicSchema(consumerConfigs);
- PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>());
+
+ @Cleanup
+ PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>());
pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
}
@@ -188,7 +195,9 @@ public class PulsarSourceTest {
consumerConfigs.put("persistent://sample/standalone/ns1/test_result",
ConsumerConfig.builder().serdeClassName(ComplexSerDe.class.getName()).build());
pulsarConfig.setTopicSchema(consumerConfigs);
- PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>());
+
+ @Cleanup
+ PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>());
pulsarSource.setupConsumerConfigs();
}
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
index ac96b44..67e9ccc 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
@@ -40,8 +40,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
@@ -49,7 +48,6 @@ import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
/**
* Unit tests for {@link WindowFunctionExecutor}
*/
-@Slf4j
public class WindowFunctionExecutorTest {
private static class TestWindowFunctionExecutor extends WindowFunctionExecutor<Long, Long> {
@@ -206,22 +204,22 @@ public class WindowFunctionExecutorTest {
public void testExecuteWithLateTupleStream() throws Exception {
windowConfig.setLateDataTopic("$late");
- Mockito.doReturn(Optional.of(new Gson().fromJson(new Gson().toJson(windowConfig), Map.class)))
+ doReturn(Optional.of(new Gson().fromJson(new Gson().toJson(windowConfig), Map.class)))
.when(context).getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY);
- TypedMessageBuilder typedMessageBuilder = Mockito.mock(TypedMessageBuilder.class);
- Mockito.when(typedMessageBuilder.value(anyString())).thenReturn(typedMessageBuilder);
- Mockito.when(typedMessageBuilder.sendAsync()).thenReturn(CompletableFuture.anyOf());
- Mockito.when(context.newOutputMessage(anyString(), anyObject())).thenReturn(typedMessageBuilder);
+ TypedMessageBuilder typedMessageBuilder = mock(TypedMessageBuilder.class);
+ when(typedMessageBuilder.value(any())).thenReturn(typedMessageBuilder);
+ when(typedMessageBuilder.sendAsync()).thenReturn(CompletableFuture.anyOf());
+ when(context.newOutputMessage(anyString(), any())).thenReturn(typedMessageBuilder);
long[] timestamps = {603, 605, 607, 618, 626, 636, 600};
List<Long> events = new ArrayList<>(timestamps.length);
for (long ts : timestamps) {
events.add(ts);
- Record<?> record = Mockito.mock(Record.class);
- Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
- Mockito.doReturn(record).when(context).getCurrentRecord();
- Mockito.doReturn(ts).when(record).getValue();
+ Record<?> record = mock(Record.class);
+ doReturn(Optional.of("test-topic")).when(record).getTopicName();
+ doReturn(record).when(context).getCurrentRecord();
+ doReturn(ts).when(record).getValue();
testWindowedPulsarFunction.process(ts, context);
//Update the watermark to this timestamp
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java
index d31767a..4c63a4f 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java
@@ -18,16 +18,23 @@
*/
package org.apache.pulsar.functions.auth;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
import io.kubernetes.client.ApiException;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.models.V1Container;
import io.kubernetes.client.models.V1PodSpec;
import io.kubernetes.client.models.V1PodTemplateSpec;
import io.kubernetes.client.models.V1Secret;
-import io.kubernetes.client.models.V1ServiceAccount;
import io.kubernetes.client.models.V1StatefulSet;
import io.kubernetes.client.models.V1StatefulSetSpec;
-import lombok.extern.slf4j.Slf4j;
+
+import java.util.Collections;
+import java.util.Optional;
+
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
@@ -35,16 +42,7 @@ import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.testng.Assert;
import org.testng.annotations.Test;
-import java.util.Collections;
-import java.util.Optional;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-@Slf4j
public class KubernetesSecretsTokenAuthProviderTest {
@Test
diff --git a/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProviderTest.java b/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProviderTest.java
index 3392e9a..3ebb526 100644
--- a/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProviderTest.java
+++ b/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProviderTest.java
@@ -19,12 +19,12 @@
package org.apache.pulsar.functions.secretsprovider;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
import java.lang.reflect.Field;
import java.util.Map;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
/**
* Unit test of {@link Exceptions}.
*/
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java
index 309e466..f728f67 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java
@@ -23,7 +23,7 @@ import org.testng.annotations.Test;
import java.util.function.Supplier;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java
index 5d2fc3f..763465b 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java
@@ -18,8 +18,8 @@
*/
package org.apache.pulsar.functions.worker;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
index c93569b..ca21c8b 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
@@ -18,8 +18,7 @@
*/
package org.apache.pulsar.functions.worker;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index 75e83bf..f16bd3f 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -18,9 +18,9 @@
*/
package org.apache.pulsar.functions.worker;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.argThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -30,7 +30,6 @@ import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.Map;
-import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
@@ -43,7 +42,6 @@ import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
-@Slf4j
public class FunctionMetaDataManagerTest {
private static PulsarClient mockPulsarClient() throws PulsarClientException {
@@ -113,25 +111,20 @@ public class FunctionMetaDataManagerTest {
verify(functionMetaDataManager, times(1)).submit(any(Request.ServiceRequest.class));
verify(functionMetaDataManager).submit(argThat(new ArgumentMatcher<Request.ServiceRequest>() {
@Override
- public boolean matches(Object o) {
- if (o instanceof Request.ServiceRequest) {
- Request.ServiceRequest serviceRequest = (Request.ServiceRequest) o;
- if (!serviceRequest.getWorkerId().equals(workerConfig.getWorkerId())) {
- return false;
- }
- if (!serviceRequest.getServiceRequestType().equals(Request.ServiceRequest.ServiceRequestType
- .UPDATE)) {
- return false;
- }
- if (!serviceRequest.getFunctionMetaData().equals(m1)) {
- return false;
- }
- if (serviceRequest.getFunctionMetaData().getVersion() != 0) {
- return false;
- }
- return true;
+ public boolean matches(Request.ServiceRequest serviceRequest) {
+ if (!serviceRequest.getWorkerId().equals(workerConfig.getWorkerId())) {
+ return false;
}
- return false;
+ if (!serviceRequest.getServiceRequestType().equals(Request.ServiceRequest.ServiceRequestType.UPDATE)) {
+ return false;
+ }
+ if (!serviceRequest.getFunctionMetaData().equals(m1)) {
+ return false;
+ }
+ if (serviceRequest.getFunctionMetaData().getVersion() != 0) {
+ return false;
+ }
+ return true;
}
}));
@@ -154,23 +147,20 @@ public class FunctionMetaDataManagerTest {
verify(functionMetaDataManager, times(1)).submit(any(Request.ServiceRequest.class));
verify(functionMetaDataManager).submit(argThat(new ArgumentMatcher<Request.ServiceRequest>() {
@Override
- public boolean matches(Object o) {
- if (o instanceof Request.ServiceRequest) {
- Request.ServiceRequest serviceRequest = (Request.ServiceRequest) o;
- if (!serviceRequest.getWorkerId().equals(workerConfig.getWorkerId())) return false;
- if (!serviceRequest.getServiceRequestType().equals(
- Request.ServiceRequest.ServiceRequestType.UPDATE)) {
- return false;
- }
- if (!serviceRequest.getFunctionMetaData().getFunctionDetails().equals(m2.getFunctionDetails())) {
- return false;
- }
- if (serviceRequest.getFunctionMetaData().getVersion() != (version + 1)) {
- return false;
- }
- return true;
+ public boolean matches(Request.ServiceRequest serviceRequest) {
+ if (!serviceRequest.getWorkerId().equals(workerConfig.getWorkerId()))
+ return false;
+ if (!serviceRequest.getServiceRequestType().equals(
+ Request.ServiceRequest.ServiceRequestType.UPDATE)) {
+ return false;
}
- return false;
+ if (!serviceRequest.getFunctionMetaData().getFunctionDetails().equals(m2.getFunctionDetails())) {
+ return false;
+ }
+ if (serviceRequest.getFunctionMetaData().getVersion() != (version + 1)) {
+ return false;
+ }
+ return true;
}
}));
@@ -205,36 +195,30 @@ public class FunctionMetaDataManagerTest {
functionMetaDataManager.changeFunctionInstanceStatus("tenant-1", "namespace-1", "func-1", 0, false);
verify(functionMetaDataManager, times(1)).submit(any(Request.ServiceRequest.class));
- verify(functionMetaDataManager).submit(argThat(new ArgumentMatcher<Request.ServiceRequest>() {
- @Override
- public boolean matches(Object o) {
- if (o instanceof Request.ServiceRequest) {
- Request.ServiceRequest serviceRequest = (Request.ServiceRequest) o;
- if (!serviceRequest.getWorkerId().equals(workerConfig.getWorkerId())) return false;
- if (!serviceRequest.getServiceRequestType().equals(
- Request.ServiceRequest.ServiceRequestType.UPDATE)) {
- return false;
- }
- if (!serviceRequest.getFunctionMetaData().getFunctionDetails().equals(f1.getFunctionDetails())) {
- return false;
- }
- if (serviceRequest.getFunctionMetaData().getVersion() != (version + 1)) {
- return false;
- }
- Map<Integer, Function.FunctionState> stateMap = serviceRequest.getFunctionMetaData().getInstanceStatesMap();
- if (stateMap == null || stateMap.isEmpty()) {
- return false;
- }
- if (stateMap.get(1) != Function.FunctionState.RUNNING) {
- return false;
- }
- if (stateMap.get(0) != Function.FunctionState.STOPPED) {
- return false;
- }
- return true;
- }
+ verify(functionMetaDataManager).submit(argThat(serviceRequest -> {
+ if (!serviceRequest.getWorkerId().equals(workerConfig.getWorkerId()))
+ return false;
+ if (!serviceRequest.getServiceRequestType().equals(
+ Request.ServiceRequest.ServiceRequestType.UPDATE)) {
+ return false;
+ }
+ if (!serviceRequest.getFunctionMetaData().getFunctionDetails().equals(f1.getFunctionDetails())) {
+ return false;
+ }
+ if (serviceRequest.getFunctionMetaData().getVersion() != (version + 1)) {
+ return false;
+ }
+ Map<Integer, Function.FunctionState> stateMap = serviceRequest.getFunctionMetaData().getInstanceStatesMap();
+ if (stateMap == null || stateMap.isEmpty()) {
+ return false;
+ }
+ if (stateMap.get(1) != Function.FunctionState.RUNNING) {
+ return false;
+ }
+ if (stateMap.get(0) != Function.FunctionState.STOPPED) {
return false;
}
+ return true;
}));
}
@@ -261,23 +245,20 @@ public class FunctionMetaDataManagerTest {
verify(functionMetaDataManager, times(1)).submit(any(Request.ServiceRequest.class));
verify(functionMetaDataManager).submit(argThat(new ArgumentMatcher<Request.ServiceRequest>() {
@Override
- public boolean matches(Object o) {
- if (o instanceof Request.ServiceRequest) {
- Request.ServiceRequest serviceRequest = (Request.ServiceRequest) o;
- if (!serviceRequest.getWorkerId().equals(workerConfig.getWorkerId())) return false;
- if (!serviceRequest.getServiceRequestType().equals(
- Request.ServiceRequest.ServiceRequestType.DELETE)) {
- return false;
- }
- if (!serviceRequest.getFunctionMetaData().getFunctionDetails().equals(m1.getFunctionDetails())) {
- return false;
- }
- if (serviceRequest.getFunctionMetaData().getVersion() != (version + 1)) {
- return false;
- }
- return true;
+ public boolean matches(Request.ServiceRequest serviceRequest) {
+ if (!serviceRequest.getWorkerId().equals(workerConfig.getWorkerId()))
+ return false;
+ if (!serviceRequest.getServiceRequestType().equals(
+ Request.ServiceRequest.ServiceRequestType.DELETE)) {
+ return false;
}
- return false;
+ if (!serviceRequest.getFunctionMetaData().getFunctionDetails().equals(m1.getFunctionDetails())) {
+ return false;
+ }
+ if (serviceRequest.getFunctionMetaData().getVersion() != (version + 1)) {
+ return false;
+ }
+ return true;
}
}));
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailerTest.java
index fee6021..bad0fff 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailerTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.functions.worker;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -79,6 +79,6 @@ public class FunctionMetaDataTopicTailerTest {
receiveFuture.thenApply(Function.identity()).get();
verify(reader, times(2)).readNextAsync();
- verify(fsm, times(1)).processRequest(any(MessageId.class), any(ServiceRequest.class));
+ verify(fsm, times(1)).processRequest(any(), any(ServiceRequest.class));
}
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index b009567..399b355 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -44,11 +44,11 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
@@ -131,16 +131,11 @@ public class FunctionRuntimeManagerTest {
verify(functionActioner, times(1)).startFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner).startFunction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() {
@Override
- public boolean matches(Object o) {
- if (o instanceof FunctionRuntimeInfo) {
- FunctionRuntimeInfo functionRuntimeInfo = (FunctionRuntimeInfo) o;
-
- if (!functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function1)) {
- return false;
- }
- return true;
+ public boolean matches(FunctionRuntimeInfo functionRuntimeInfo) {
+ if (!functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function1)) {
+ return false;
}
- return false;
+ return true;
}
}));
verify(functionActioner, times(0)).stopFunction(any(FunctionRuntimeInfo.class));
@@ -231,16 +226,11 @@ public class FunctionRuntimeManagerTest {
verify(functionActioner, times(1)).terminateFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner).terminateFunction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() {
@Override
- public boolean matches(Object o) {
- if (o instanceof FunctionRuntimeInfo) {
- FunctionRuntimeInfo functionRuntimeInfo = (FunctionRuntimeInfo) o;
-
- if (!functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function1)) {
- return false;
- }
- return true;
+ public boolean matches(FunctionRuntimeInfo functionRuntimeInfo) {
+ if (!functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function1)) {
+ return false;
}
- return false;
+ return true;
}
}));
@@ -329,32 +319,22 @@ public class FunctionRuntimeManagerTest {
verify(functionActioner).stopFunction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() {
@Override
- public boolean matches(Object o) {
- if (o instanceof FunctionRuntimeInfo) {
- FunctionRuntimeInfo functionRuntimeInfo = (FunctionRuntimeInfo) o;
-
- if (!functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function2)) {
- return false;
- }
- return true;
+ public boolean matches(FunctionRuntimeInfo functionRuntimeInfo) {
+ if (!functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function2)) {
+ return false;
}
- return false;
+ return true;
}
}));
verify(functionActioner, times(1)).startFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner).startFunction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() {
@Override
- public boolean matches(Object o) {
- if (o instanceof FunctionRuntimeInfo) {
- FunctionRuntimeInfo functionRuntimeInfo = (FunctionRuntimeInfo) o;
-
- if (!functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function2)) {
- return false;
- }
- return true;
+ public boolean matches(FunctionRuntimeInfo functionRuntimeInfo) {
+ if (!functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function2)) {
+ return false;
}
- return false;
+ return true;
}
}));
@@ -385,19 +365,11 @@ public class FunctionRuntimeManagerTest {
// make sure terminate is not called since this is a update operation
verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
- verify(functionActioner).stopFunction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() {
- @Override
- public boolean matches(Object o) {
- if (o instanceof FunctionRuntimeInfo) {
- FunctionRuntimeInfo functionRuntimeInfo = (FunctionRuntimeInfo) o;
-
- if (!functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function2)) {
- return false;
- }
- return true;
+ verify(functionActioner).stopFunction(argThat(functionRuntimeInfo -> {
+ if (!functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function2)) {
+ return false;
}
- return false;
- }
+ return true;
}));
verify(functionActioner, times(0)).startFunction(any(FunctionRuntimeInfo.class));
@@ -630,33 +602,17 @@ public class FunctionRuntimeManagerTest {
verify(functionActioner, times(1)).stopFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
- verify(functionActioner).startFunction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() {
- @Override
- public boolean matches(Object o) {
- if (o instanceof FunctionRuntimeInfo) {
- FunctionRuntimeInfo functionRuntimeInfo = (FunctionRuntimeInfo) o;
-
- if (!functionRuntimeInfo.getFunctionInstance().equals(assignment1.getInstance())) {
- return false;
- }
- return true;
+ verify(functionActioner).startFunction(argThat(functionRuntimeInfo -> {
+ if (!functionRuntimeInfo.getFunctionInstance().equals(assignment1.getInstance())) {
+ return false;
}
- return false;
- }
+ return true;
}));
- verify(functionActioner).stopFunction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() {
- @Override
- public boolean matches(Object o) {
- if (o instanceof FunctionRuntimeInfo) {
- FunctionRuntimeInfo functionRuntimeInfo = (FunctionRuntimeInfo) o;
-
- if (functionRuntimeInfo.getRuntimeSpawner() != null) {
- return false;
- }
- return true;
+ verify(functionActioner).stopFunction(argThat(functionRuntimeInfo -> {
+ if (functionRuntimeInfo.getRuntimeSpawner() != null) {
+ return false;
}
- return false;
- }
+ return true;
}));
Assert.assertEquals(functionRuntimeManager.functionRuntimeInfoMap.size(), 1);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index cfbfa47..494c93e 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -18,9 +18,9 @@
*/
package org.apache.pulsar.functions.worker;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -30,7 +30,6 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
-import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
@@ -48,7 +47,6 @@ import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.functions.proto.Function;
-import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -85,7 +83,7 @@ public class MembershipManagerTest {
AtomicReference<ConsumerEventListener> listenerHolder = new AtomicReference<>();
when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenAnswer(invocationOnMock -> {
- ConsumerEventListener listener = invocationOnMock.getArgumentAt(0, ConsumerEventListener.class);
+ ConsumerEventListener listener = invocationOnMock.getArgument(0);
listenerHolder.set(listener);
return mockConsumerBuilder;
@@ -270,20 +268,7 @@ public class MembershipManagerTest {
membershipManager.checkFailures(functionMetaDataManager, functionRuntimeManager, schedulerManager);
verify(functionRuntimeManager, times(1)).removeAssignments(
- argThat(new ArgumentMatcher<Collection<Function.Assignment>>() {
- @Override
- public boolean matches(Object o) {
- if (o instanceof Collection) {
- Collection<Function.Assignment> assignments = (Collection) o;
-
- if (!assignments.contains(assignment2)) {
- return false;
- }
- return true;
- }
- return false;
- }
- }));
+ argThat(assignments -> assignments.contains(assignment2)));
verify(schedulerManager, times(1)).schedule();
}
@@ -437,5 +422,5 @@ public class MembershipManagerTest {
verify(functionRuntimeManager, times(0)).removeAssignments(any());
Assert.assertEquals(membershipManager.unsignedFunctionDurations.size(), 0);
}
-
+
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 9f4fa2e..1511780 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -57,10 +57,10 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/dlog/DLInputStreamTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/dlog/DLInputStreamTest.java
index ab0adb4..5821e22 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/dlog/DLInputStreamTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/dlog/DLInputStreamTest.java
@@ -19,9 +19,9 @@
package org.apache.pulsar.functions.worker.dlog;
import static com.google.common.base.Charsets.UTF_8;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/dlog/DLOutputStreamTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/dlog/DLOutputStreamTest.java
index 5a3fa05..d7676eb 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/dlog/DLOutputStreamTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/dlog/DLOutputStreamTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.functions.worker.dlog;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/executor/MockExecutorController.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/executor/MockExecutorController.java
index 151e4b1..a0f051f 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/executor/MockExecutorController.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/executor/MockExecutorController.java
@@ -19,8 +19,8 @@
package org.apache.pulsar.functions.worker.executor;
import static com.google.common.base.Preconditions.checkArgument;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.doAnswer;
import com.google.common.collect.Lists;
@@ -142,10 +142,10 @@ public class MockExecutorController {
private static Answer<ScheduledFuture<?>> answerAtFixedRate(MockExecutorController controller, int numTimes) {
return invocationOnMock -> {
- Runnable task = invocationOnMock.getArgumentAt(0, Runnable.class);
- long initialDelay = invocationOnMock.getArgumentAt(1, long.class);
- long delay = invocationOnMock.getArgumentAt(2, long.class);
- TimeUnit unit = invocationOnMock.getArgumentAt(3, TimeUnit.class);
+ Runnable task = invocationOnMock.getArgument(0);
+ long initialDelay = invocationOnMock.getArgument(1);
+ long delay = invocationOnMock.getArgument(2);
+ TimeUnit unit = invocationOnMock.getArgument(3);
DeferredTask deferredTask = null;
for (int i = 0; i < numTimes; i++) {
@@ -163,9 +163,9 @@ public class MockExecutorController {
private static Answer<ScheduledFuture<?>> answerDelay(MockExecutorController executor) {
return invocationOnMock -> {
- Runnable task = invocationOnMock.getArgumentAt(0, Runnable.class);
- long value = invocationOnMock.getArgumentAt(1, long.class);
- TimeUnit unit = invocationOnMock.getArgumentAt(2, TimeUnit.class);
+ Runnable task = invocationOnMock.getArgument(0);
+ long value = invocationOnMock.getArgument(1);
+ TimeUnit unit = invocationOnMock.getArgument(2);
DeferredTask deferredTask = executor.addDelayedTask(executor, unit.toMillis(value), task);
if (value <= 0) {
task.run();
@@ -178,7 +178,7 @@ public class MockExecutorController {
private static Answer<Future<?>> answerNow() {
return invocationOnMock -> {
- Runnable task = invocationOnMock.getArgumentAt(0, Runnable.class);
+ Runnable task = invocationOnMock.getArgument(0);
task.run();
SettableFuture<Void> future = SettableFuture.create();
future.set(null);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/request/ServiceRequestManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/request/ServiceRequestManagerTest.java
index 91fa0e6..501ce1a 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/request/ServiceRequestManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/request/ServiceRequestManagerTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.functions.worker.request;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index dcfa8f1..4e0c5fb 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -62,9 +62,9 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 62cba58..20988e0 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -18,11 +18,39 @@
*/
package org.apache.pulsar.functions.worker.rest.api.v2;
+
+import static org.apache.pulsar.functions.utils.FunctionCommon.mergeJson;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.doNothing;
+import static org.powermock.api.mockito.PowerMockito.doThrow;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.testng.Assert.assertEquals;
+
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
-import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator;
@@ -65,38 +93,11 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.StreamingOutput;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.pulsar.functions.utils.FunctionCommon.mergeJson;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.doNothing;
-import static org.powermock.api.mockito.PowerMockito.doThrow;
-import static org.powermock.api.mockito.PowerMockito.mockStatic;
-import static org.testng.Assert.assertEquals;
-
/**
* Unit test of {@link FunctionsApiV2Resource}.
*/
@PrepareForTest({WorkerUtils.class, InstanceUtils.class})
@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.functions.api.*" })
-@Slf4j
public class FunctionApiV2ResourceTest {
@ObjectFactory
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 6075d68..4bb08c1 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -18,8 +18,35 @@
*/
package org.apache.pulsar.functions.worker.rest.api.v3;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.doNothing;
+import static org.powermock.api.mockito.PowerMockito.doThrow;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.testng.Assert.assertEquals;
+
import com.google.common.collect.Lists;
-import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator;
@@ -62,38 +89,11 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.StreamingOutput;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.doNothing;
-import static org.powermock.api.mockito.PowerMockito.doThrow;
-import static org.powermock.api.mockito.PowerMockito.mockStatic;
-import static org.testng.Assert.assertEquals;
-
/**
* Unit test of {@link FunctionsApiV2Resource}.
*/
@PrepareForTest({WorkerUtils.class, InstanceUtils.class})
@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.functions.api.*" })
-@Slf4j
public class FunctionApiV3ResourceTest {
@ObjectFactory
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 41b3204..c47f9e5 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.functions.worker.rest.api.v3;
import com.google.common.collect.Lists;
-import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator;
@@ -65,7 +64,6 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
-import java.nio.file.Path;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -74,9 +72,9 @@ import java.util.concurrent.CompletableFuture;
import static org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.ATLEAST_ONCE;
import static org.apache.pulsar.functions.source.TopicSchema.DEFAULT_SERDE;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -91,7 +89,7 @@ import static org.testng.Assert.assertEquals;
*/
@PrepareForTest({WorkerUtils.class, SinkConfigUtils.class, ConnectorUtils.class, FunctionCommon.class, InstanceUtils.class})
@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*", "java.io.*" })
-@Slf4j
+
public class SinkApiV3ResourceTest {
@ObjectFactory
@@ -819,7 +817,7 @@ public class SinkApiV3ResourceTest {
FunctionCommon.getSinkType(anyString(), any(NarClassLoader.class));
doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
- FunctionCommon.extractNarClassLoader(any(Path.class), any(File.class));
+ FunctionCommon.extractNarClassLoader(any(), any());
doReturn(ATLEAST_ONCE).when(FunctionCommon.class);
FunctionCommon.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
@@ -890,7 +888,7 @@ public class SinkApiV3ResourceTest {
FunctionCommon.getSinkType(anyString(), any(NarClassLoader.class));
doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
- FunctionCommon.extractNarClassLoader(any(Path.class), any(File.class));
+ FunctionCommon.extractNarClassLoader(any(), any());
doReturn(ATLEAST_ONCE).when(FunctionCommon.class);
FunctionCommon.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
@@ -988,7 +986,7 @@ public class SinkApiV3ResourceTest {
PowerMockito.when(FunctionCommon.class, "extractFileFromPkgURL", any()).thenCallRealMethod();
doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
- FunctionCommon.extractNarClassLoader(any(Path.class), any(File.class));
+ FunctionCommon.extractNarClassLoader(any(), any());
doReturn(ATLEAST_ONCE).when(FunctionCommon.class);
FunctionCommon.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index 276f3c0..faf3f88 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -18,9 +18,31 @@
*/
package org.apache.pulsar.functions.worker.rest.api.v3;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.doNothing;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
+import static org.powermock.api.mockito.PowerMockito.doThrow;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.testng.Assert.assertEquals;
+
import com.google.common.collect.Lists;
-import com.google.gson.Gson;
-import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import javax.ws.rs.core.Response;
+
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator;
@@ -63,35 +85,11 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
-import javax.ws.rs.core.Response;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-import java.nio.file.Path;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.doNothing;
-import static org.powermock.api.mockito.PowerMockito.doReturn;
-import static org.powermock.api.mockito.PowerMockito.doThrow;
-import static org.powermock.api.mockito.PowerMockito.mockStatic;
-import static org.testng.Assert.assertEquals;
-
/**
* Unit test of {@link SourcesApiV3Resource}.
*/
@PrepareForTest({WorkerUtils.class, ConnectorUtils.class, FunctionCommon.class, InstanceUtils.class})
@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*" })
-@Slf4j
public class SourceApiV3ResourceTest {
@ObjectFactory
@@ -839,7 +837,7 @@ public class SourceApiV3ResourceTest {
FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class));
doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
- FunctionCommon.extractNarClassLoader(any(Path.class), any(File.class));
+ FunctionCommon.extractNarClassLoader(any(), any());
this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
@@ -909,7 +907,7 @@ public class SourceApiV3ResourceTest {
FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class));
doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
- FunctionCommon.extractNarClassLoader(any(Path.class), any(File.class));
+ FunctionCommon.extractNarClassLoader(any(), any(File.class));
this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
@@ -1003,7 +1001,7 @@ public class SourceApiV3ResourceTest {
PowerMockito.when(FunctionCommon.class, "extractFileFromPkgURL", any()).thenCallRealMethod();
doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
- FunctionCommon.extractNarClassLoader(any(Path.class), any(File.class));
+ FunctionCommon.extractNarClassLoader(any(), any());
this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java
index a21633d..a4582bc 100644
--- a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.io.file;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java
index 855498e..5be101f 100644
--- a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java
@@ -19,7 +19,7 @@
package org.apache.pulsar.io.file;
import static org.testng.Assert.fail;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java
index 98d35a9..9970a55 100644
--- a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.io.file;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
diff --git a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java
index fe8941e..a746f7e 100644
--- a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java
+++ b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java
@@ -19,8 +19,6 @@
package org.apache.pulsar.io.influxdb;
import com.google.common.collect.Maps;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@@ -46,7 +44,7 @@ import org.testng.annotations.Test;
import java.util.Map;
-import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -151,7 +149,7 @@ public class InfluxDBGenericRecordSinkTest {
verify(this.influxDB, times(1)).createDatabase("testDB");
doAnswer(invocationOnMock -> {
- BatchPoints batchPoints = invocationOnMock.getArgumentAt(0, BatchPoints.class);
+ BatchPoints batchPoints = invocationOnMock.getArgument(0, BatchPoints.class);
Assert.assertNotNull(batchPoints, "batchPoints should not be null.");
return null;
}).when(influxDB).write(any(BatchPoints.class));
diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
index 3eb6f6e..7dc665d 100644
--- a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
+++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
@@ -18,21 +18,14 @@
*/
package org.apache.pulsar.io.mongodb;
+import static java.util.stream.Collectors.toList;
+
import com.google.common.collect.Lists;
import com.mongodb.MongoBulkWriteException;
import com.mongodb.async.client.MongoClient;
import com.mongodb.async.client.MongoClients;
import com.mongodb.async.client.MongoCollection;
import com.mongodb.async.client.MongoDatabase;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.core.SinkContext;
-import org.apache.pulsar.io.core.annotations.Connector;
-import org.apache.pulsar.io.core.annotations.IOType;
-import org.bson.BSONException;
-import org.bson.Document;
-import org.bson.json.JsonParseException;
import java.nio.charset.Charset;
import java.util.ArrayList;
@@ -42,9 +35,19 @@ import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import java.util.stream.IntStream;
-import static java.util.stream.Collectors.toList;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+import org.bson.BSONException;
+import org.bson.Document;
+import org.bson.json.JsonParseException;
/**
* The base class for MongoDB sinks.
@@ -70,6 +73,15 @@ public class MongoSink implements Sink<byte[]> {
private ScheduledExecutorService flushExecutor;
+ private Supplier<MongoClient> clientProvider;
+
+ public MongoSink() {
+ this(null);
+ }
+
+ public MongoSink(Supplier<MongoClient> clientProvider) {
+ this.clientProvider = clientProvider;
+ }
@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
@@ -78,7 +90,12 @@ public class MongoSink implements Sink<byte[]> {
mongoConfig = MongoConfig.load(config);
mongoConfig.validate();
- mongoClient = MongoClients.create(mongoConfig.getMongoUri());
+ if (clientProvider != null) {
+ mongoClient = clientProvider.get();
+ } else {
+ mongoClient = MongoClients.create(mongoConfig.getMongoUri());
+ }
+
final MongoDatabase db = mongoClient.getDatabase(mongoConfig.getDatabase());
collection = db.getCollection(mongoConfig.getCollection());
diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
index c941708..c4ca44a 100644
--- a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
+++ b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
@@ -19,36 +19,35 @@
package org.apache.pulsar.io.mongodb;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import com.mongodb.MongoBulkWriteException;
import com.mongodb.async.SingleResultCallback;
import com.mongodb.async.client.MongoClient;
-import com.mongodb.async.client.MongoClients;
import com.mongodb.async.client.MongoCollection;
import com.mongodb.async.client.MongoDatabase;
import com.mongodb.bulk.BulkWriteError;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.SinkContext;
import org.bson.BsonDocument;
import org.mockito.Mock;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.IObjectFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.*;
-
-@PrepareForTest(MongoClients.class)
-@PowerMockIgnore({"org.apache.logging.log4j.*"})
public class MongoSinkTest {
@Mock
@@ -78,7 +77,7 @@ public class MongoSinkTest {
@BeforeMethod
public void setUp() {
- sink = new MongoSink();
+
map = TestHelper.createMap(true);
mockRecord = mock(Record.class);
@@ -87,9 +86,8 @@ public class MongoSinkTest {
mockMongoDb = mock(MongoDatabase.class);
mockMongoColl = mock(MongoCollection.class);
- PowerMockito.mockStatic(MongoClients.class);
+ sink = new MongoSink(() -> mockMongoClient);
- when(MongoClients.create(anyString())).thenReturn(mockMongoClient);
when(mockMongoClient.getDatabase(anyString())).thenReturn(mockMongoDb);
when(mockMongoDb.getCollection(anyString())).thenReturn(mockMongoColl);
}
@@ -98,7 +96,7 @@ public class MongoSinkTest {
when(mockRecord.getValue()).thenReturn("{\"hello\":\"pulsar\"}".getBytes());
doAnswer((invocation) -> {
- SingleResultCallback cb = invocation.getArgumentAt(1, SingleResultCallback.class);
+ SingleResultCallback cb = invocation.getArgument(1, SingleResultCallback.class);
MongoBulkWriteException exc = null;
if (throwBulkError) {
@@ -109,17 +107,17 @@ public class MongoSinkTest {
cb.onResult(null, exc);
return null;
- }).when(mockMongoColl).insertMany(anyObject(), anyObject());
+ }).when(mockMongoColl).insertMany(any(), any());
}
private void initFailContext(String msg) {
when(mockRecord.getValue()).thenReturn(msg.getBytes());
doAnswer((invocation) -> {
- SingleResultCallback cb = invocation.getArgumentAt(1, SingleResultCallback.class);
+ SingleResultCallback cb = invocation.getArgument(1, SingleResultCallback.class);
cb.onResult(null, new Exception("Oops"));
return null;
- }).when(mockMongoColl).insertMany(anyObject(), anyObject());
+ }).when(mockMongoColl).insertMany(any(), any());
}
@AfterMethod
diff --git a/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java
index 68bfbd5..45aeca9 100644
--- a/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java
+++ b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.log4j2.appender;
import java.io.Serializable;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
+
import org.apache.logging.log4j.core.AbstractLifeCycle;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Filter;
diff --git a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java
index 71b94ae..73cbadf 100644
--- a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java
+++ b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java
@@ -18,10 +18,10 @@
*/
package org.apache.pulsar.log4j2.appender;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -120,11 +120,30 @@ public class PulsarAppenderTest {
doReturn(producerBuilder).when(producerBuilder).topic(anyString());
doReturn(producerBuilder).when(producerBuilder).producerName(anyString());
doReturn(producerBuilder).when(producerBuilder).enableBatching(anyBoolean());
- doReturn(producerBuilder).when(producerBuilder).batchingMaxPublishDelay(anyInt(), any(TimeUnit.class));
+ doReturn(producerBuilder).when(producerBuilder).batchingMaxPublishDelay(anyLong(), any(TimeUnit.class));
doReturn(producerBuilder).when(producerBuilder).blockIfQueueFull(anyBoolean());
doReturn(producer).when(producerBuilder).create();
when(producer.newMessage()).then(invocation -> new MockedMessageBuilder());
+ when(producer.send(any(byte[].class)))
+ .thenAnswer(invocationOnMock -> {
+ Message<byte[]> msg = invocationOnMock.getArgument(0);
+ synchronized (history) {
+ history.add(msg);
+ }
+ return null;
+ });
+
+ when(producer.sendAsync(any(byte[].class)))
+ .thenAnswer(invocationOnMock -> {
+ Message<byte[]> msg = invocationOnMock.getArgument(0);
+ synchronized (history) {
+ history.add(msg);
+ }
+ CompletableFuture<MessageId> future = new CompletableFuture<>();
+ future.complete(mock(MessageId.class));
+ return future;
+ });
PulsarManager.PULSAR_CLIENT_BUILDER = () -> clientBuilder;
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index f964d37..05261a9 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -370,7 +370,6 @@ The Apache Software License, Version 2.0
* FindBugs JSR305
- jsr305-3.0.2.jar
* Objenesis
- - objenesis-2.1.jar
- objenesis-2.6.jar
* Okio
- okio-1.13.0.jar
@@ -438,7 +437,7 @@ The Apache Software License, Version 2.0
- jackson-module-jaxb-annotations-2.8.11.jar
- jackson-module-jsonSchema-2.8.11.jar
* Java Assist
- - javassist-3.21.0-GA.jar
+ - javassist-3.25.0-GA.jar
* Jetty
- jetty-http-9.4.12.v20180830.jar
- jetty-io-9.4.12.v20180830.jar
diff --git a/pulsar-sql/presto-distribution/pom.xml b/pulsar-sql/presto-distribution/pom.xml
index 8e2b03a..1b92b80 100644
--- a/pulsar-sql/presto-distribution/pom.xml
+++ b/pulsar-sql/presto-distribution/pom.xml
@@ -237,6 +237,13 @@
<header>../../src/license-header.txt</header>
</configuration>
</plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
</plugins>
<extensions>
<extension>
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index c38c0ac..fe3ac57 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -81,9 +81,9 @@ import static com.facebook.presto.spi.type.DateType.DATE;
import static com.facebook.presto.spi.type.TimeType.TIME;
import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
index a704be6..dfd723e 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
@@ -45,7 +45,7 @@ import java.util.Map;
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
index b86a7a5..c02df94 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
@@ -42,8 +42,8 @@ import java.util.stream.Collectors;
import static com.facebook.presto.spi.type.DateTimeEncoding.packDateTimeWithZone;
import static com.facebook.presto.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -291,5 +291,5 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
}
}
-
+
}
diff --git a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
index 5764ac7..f8a1aa0 100644
--- a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
+++ b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
@@ -18,8 +18,8 @@
*/
package org.apache.pulsar.storm;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -33,7 +33,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.client.api.ClientBuilder;
@@ -43,26 +42,20 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
-import org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator;
import org.apache.pulsar.storm.PulsarSpout.SpoutConsumer;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Values;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
import com.google.common.collect.Maps;
public class PulsarSpoutTest {
- private static final Logger log = LoggerFactory.getLogger(PulsarSpoutTest.class);
-
@Test
public void testAckFailedMessage() throws Exception {
-
+
PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration();
conf.setServiceUrl("http://localhost:8080");
conf.setSubscriptionName("sub1");
@@ -77,13 +70,13 @@ public class PulsarSpoutTest {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
-
+
});
-
+
ClientBuilder builder = spy(new ClientBuilderImpl());
PulsarSpout spout = spy(new PulsarSpout(conf, builder));
-
- Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), null, Schema.BYTES);
+
+ Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), new byte[0], Schema.BYTES);
Consumer<byte[]> consumer = mock(Consumer.class);
SpoutConsumer spoutConsumer = new SpoutConsumer(consumer);
CompletableFuture<Void> future = new CompletableFuture<>();
@@ -92,13 +85,13 @@ public class PulsarSpoutTest {
Field consField = PulsarSpout.class.getDeclaredField("consumer");
consField.setAccessible(true);
consField.set(spout, spoutConsumer);
-
+
spout.fail(msg);
spout.ack(msg);
spout.emitNextAvailableTuple();
verify(consumer, atLeast(1)).receive(anyInt(), any());
}
-
+
@Test
public void testPulsarSpout() throws Exception {
PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration();
@@ -138,9 +131,7 @@ public class PulsarSpoutTest {
when(client.getSharedConsumer(any())).thenReturn(consumer);
instances.put(componentId, client);
- ByteBuf data = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
- data.writeBytes("test".getBytes());
- Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), data, Schema.BYTES);
+ Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), "test".getBytes(), Schema.BYTES);
when(consumer.receive(anyInt(), any())).thenReturn(msg);
spout.open(config, context, collector);
@@ -149,5 +140,5 @@ public class PulsarSpoutTest {
assertTrue(called.get());
verify(consumer, atLeast(1)).receive(anyInt(), any());
}
-
+
}
diff --git a/tiered-storage/jcloud/pom.xml b/tiered-storage/jcloud/pom.xml
index 2967307..ba4fb0d 100644
--- a/tiered-storage/jcloud/pom.xml
+++ b/tiered-storage/jcloud/pom.xml
@@ -118,6 +118,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>javax.annotation</groupId>
<artifactId>jsr250-api</artifactId>
<version>1.0</version>
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java
index de311bc..16784a5 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java
@@ -19,7 +19,7 @@
package org.apache.bookkeeper.mledger.offload.jcloud;
import static org.mockito.AdditionalAnswers.delegatesTo;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -29,14 +29,14 @@ import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
+
import lombok.extern.slf4j.Slf4j;
+
import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreBackedInputStreamImpl;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.domain.Blob;
-import org.jclouds.blobstore.options.GetOptions;
import org.jclouds.io.Payload;
import org.jclouds.io.Payloads;
-import org.mockito.Matchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -222,7 +222,7 @@ class BlobStoreBackedInputStreamTest extends BlobStoreTestBase {
}
verify(spiedBlobStore, times(1))
- .getBlob(Mockito.eq(BUCKET), Mockito.eq(objectKey), Matchers.<GetOptions>anyObject());
+ .getBlob(Mockito.eq(BUCKET), Mockito.eq(objectKey), any());
}
@Test
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index 07308ff..a3c4972 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -20,9 +20,9 @@ package org.apache.bookkeeper.mledger.offload.jcloud.impl;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.AdditionalAnswers.delegatesTo;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.mock;
import com.amazonaws.auth.AWSCredentials;
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java
index 5cf6bd5..2803b16 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java
@@ -25,8 +25,10 @@ import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
import com.google.common.io.ByteStreams;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
+
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+
import java.io.ByteArrayInputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -36,8 +38,9 @@ import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.stream.IntStream;
+
import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
+
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
@@ -47,7 +50,6 @@ import org.apache.bookkeeper.mledger.offload.jcloud.DataBlockHeader;
import org.testng.annotations.Test;
import org.testng.collections.Lists;
-@Slf4j
public class BlockAwareSegmentInputStreamTest {
private static final byte DEFAULT_ENTRY_BYTE = 0xB;