You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/07/25 08:58:02 UTC
[flink] 02/05: [FLINK-23528][connectors/kinesis] Use proper mock of ExecutorService in KinesisDataFetcherTest.
This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f2e265f81927d9871d408dec5d21ed6ebddd7910
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Wed Sep 8 18:22:34 2021 +0200
[FLINK-23528][connectors/kinesis] Use proper mock of ExecutorService in KinesisDataFetcherTest.
Mockito makes it so much harder to debug tests. Here, we replace Mockito with a small test class that emulates the previously dispersed functionality in a simpler way.
---
.../kinesis/internals/KinesisDataFetcherTest.java | 13 ++-
.../testutils/FakeKinesisBehavioursFactory.java | 25 ++---
.../testutils/TestableKinesisDataFetcher.java | 108 ++++++++++++++-------
3 files changed, 94 insertions(+), 52 deletions(-)
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index c2d6dde7d96..8be2bb41097 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -17,6 +17,11 @@
package org.apache.flink.streaming.connectors.kinesis.internals;
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.testutils.CheckedThread;
@@ -44,12 +49,6 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDa
import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcherForShardConsumerException;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.TestLogger;
-
-import com.amazonaws.services.kinesis.model.HashKeyRange;
-import com.amazonaws.services.kinesis.model.SequenceNumberRange;
-import com.amazonaws.services.kinesis.model.Shard;
-import org.apache.commons.lang3.mutable.MutableBoolean;
-import org.apache.commons.lang3.mutable.MutableLong;
import org.junit.Assert;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
@@ -93,7 +92,7 @@ public class KinesisDataFetcherTest extends TestLogger {
assertTrue(fetcher.isRunning());
}
- @Test(timeout = 10000)
+ @Test
public void testIsRunningFalseAfterShutDown() throws InterruptedException {
KinesisDataFetcher<String> fetcher =
createTestDataFetcherWithNoShards(10, 2, "test-stream");
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index 1e737ac3358..46cc797ff7d 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -41,6 +41,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils.createDummyStreamShardHandle;
@@ -660,17 +661,19 @@ public class FakeKinesisBehavioursFactory {
shardIterator);
List<Record> records = Collections.emptyList();
try {
- String data = queue.take();
- Record record =
- new Record()
- .withData(
- ByteBuffer.wrap(
- data.getBytes(ConfigConstants.DEFAULT_CHARSET)))
- .withPartitionKey(UUID.randomUUID().toString())
- .withApproximateArrivalTimestamp(
- new Date(System.currentTimeMillis()))
- .withSequenceNumber(String.valueOf(0));
- records = Collections.singletonList(record);
+ String data = queue.poll(100, TimeUnit.MILLISECONDS);
+ if (data != null) {
+ Record record =
+ new Record()
+ .withData(
+ ByteBuffer.wrap(
+ data.getBytes(ConfigConstants.DEFAULT_CHARSET)))
+ .withPartitionKey(UUID.randomUUID().toString())
+ .withApproximateArrivalTimestamp(
+ new Date(System.currentTimeMillis()))
+ .withSequenceNumber(String.valueOf(0));
+ records = Collections.singletonList(record);
+ }
} catch (InterruptedException e) {
shardIterator = null;
}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
index b4c69eebb8c..4bdb95eb78a 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
@@ -26,23 +26,19 @@ import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
-import org.mockito.invocation.InvocationOnMock;
-
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
/** Extension of the {@link KinesisDataFetcher} for testing. */
public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
@@ -50,9 +46,6 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
private final Semaphore discoveryWaiter = new Semaphore(0);
private final OneShotLatch shutdownWaiter;
- private volatile boolean running;
- private volatile boolean executorServiceShutdownNowCalled;
-
public TestableKinesisDataFetcher(
List<String> fakeStreams,
SourceFunction.SourceContext<T> sourceContext,
@@ -109,8 +102,6 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
this.runWaiter = new OneShotLatch();
this.shutdownWaiter = new OneShotLatch();
-
- this.running = true;
}
@Override
@@ -130,28 +121,7 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
@Override
protected ExecutorService createShardConsumersThreadPool(String subtaskName) {
// this is just a dummy fetcher, so no need to create a thread pool for shard consumers
- ExecutorService mockExecutorService = mock(ExecutorService.class);
- when(mockExecutorService.isTerminated())
- .thenAnswer((InvocationOnMock invocation) -> !running);
- when(mockExecutorService.shutdownNow())
- .thenAnswer(
- invocationOnMock -> {
- executorServiceShutdownNowCalled = true;
- return Collections.emptyList();
- });
- try {
- when(mockExecutorService.awaitTermination(anyLong(), any()))
- .thenAnswer(invocationOnMock -> !running && executorServiceShutdownNowCalled);
- } catch (InterruptedException e) {
- // We're just trying to stub the method. Must acknowledge the checked exception.
- }
- return mockExecutorService;
- }
-
- @Override
- public void awaitTermination() throws InterruptedException {
- this.running = false;
- super.awaitTermination();
+ return new TestExecutorService();
}
@Override
@@ -174,4 +144,74 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
public void waitUntilDiscovery(int number) throws InterruptedException {
discoveryWaiter.acquire(number);
}
+
+ private static class TestExecutorService implements ExecutorService {
+ boolean terminated = false;
+
+ @Override
+ public void execute(Runnable command) {}
+
+ @Override
+ public void shutdown() {
+ terminated = true;
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ terminated = true;
+ return Collections.emptyList();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return terminated;
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return terminated;
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) {
+ return terminated;
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ return null;
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ return null;
+ }
+
+ @Override
+ public Future<?> submit(Runnable task) {
+ return null;
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
+ return null;
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(
+ Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
+ return null;
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks) {
+ return null;
+ }
+
+ @Override
+ public <T> T invokeAny(
+ Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
+ return null;
+ }
+ }
}