You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/07/25 08:58:02 UTC

[flink] 02/05: [FLINK-23528][connectors/kinesis] Use proper mock of ExecutorService in KinesisDataFetcherTest.

This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f2e265f81927d9871d408dec5d21ed6ebddd7910
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Wed Sep 8 18:22:34 2021 +0200

    [FLINK-23528][connectors/kinesis] Use proper mock of ExecutorService in KinesisDataFetcherTest.
    
    Mockito makes it so much harder to debug tests. Here, we replace Mockito with a small test class that emulates the previously dispersed functionality in a simpler way.
---
 .../kinesis/internals/KinesisDataFetcherTest.java  |  13 ++-
 .../testutils/FakeKinesisBehavioursFactory.java    |  25 ++---
 .../testutils/TestableKinesisDataFetcher.java      | 108 ++++++++++++++-------
 3 files changed, 94 insertions(+), 52 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index c2d6dde7d96..8be2bb41097 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -17,6 +17,11 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.core.testutils.CheckedThread;
@@ -44,12 +49,6 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDa
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcherForShardConsumerException;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
-
-import com.amazonaws.services.kinesis.model.HashKeyRange;
-import com.amazonaws.services.kinesis.model.SequenceNumberRange;
-import com.amazonaws.services.kinesis.model.Shard;
-import org.apache.commons.lang3.mutable.MutableBoolean;
-import org.apache.commons.lang3.mutable.MutableLong;
 import org.junit.Assert;
 import org.junit.Test;
 import org.powermock.reflect.Whitebox;
@@ -93,7 +92,7 @@ public class KinesisDataFetcherTest extends TestLogger {
         assertTrue(fetcher.isRunning());
     }
 
-    @Test(timeout = 10000)
+    @Test
     public void testIsRunningFalseAfterShutDown() throws InterruptedException {
         KinesisDataFetcher<String> fetcher =
                 createTestDataFetcherWithNoShards(10, 2, "test-stream");
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index 1e737ac3358..46cc797ff7d 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -41,6 +41,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils.createDummyStreamShardHandle;
@@ -660,17 +661,19 @@ public class FakeKinesisBehavioursFactory {
                             shardIterator);
             List<Record> records = Collections.emptyList();
             try {
-                String data = queue.take();
-                Record record =
-                        new Record()
-                                .withData(
-                                        ByteBuffer.wrap(
-                                                data.getBytes(ConfigConstants.DEFAULT_CHARSET)))
-                                .withPartitionKey(UUID.randomUUID().toString())
-                                .withApproximateArrivalTimestamp(
-                                        new Date(System.currentTimeMillis()))
-                                .withSequenceNumber(String.valueOf(0));
-                records = Collections.singletonList(record);
+                String data = queue.poll(100, TimeUnit.MILLISECONDS);
+                if (data != null) {
+                    Record record =
+                            new Record()
+                                    .withData(
+                                            ByteBuffer.wrap(
+                                                    data.getBytes(ConfigConstants.DEFAULT_CHARSET)))
+                                    .withPartitionKey(UUID.randomUUID().toString())
+                                    .withApproximateArrivalTimestamp(
+                                            new Date(System.currentTimeMillis()))
+                                    .withSequenceNumber(String.valueOf(0));
+                    records = Collections.singletonList(record);
+                }
             } catch (InterruptedException e) {
                 shardIterator = null;
             }
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
index b4c69eebb8c..4bdb95eb78a 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
@@ -26,23 +26,19 @@ import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 
-import org.mockito.invocation.InvocationOnMock;
-
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 /** Extension of the {@link KinesisDataFetcher} for testing. */
 public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
 
@@ -50,9 +46,6 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
     private final Semaphore discoveryWaiter = new Semaphore(0);
     private final OneShotLatch shutdownWaiter;
 
-    private volatile boolean running;
-    private volatile boolean executorServiceShutdownNowCalled;
-
     public TestableKinesisDataFetcher(
             List<String> fakeStreams,
             SourceFunction.SourceContext<T> sourceContext,
@@ -109,8 +102,6 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
 
         this.runWaiter = new OneShotLatch();
         this.shutdownWaiter = new OneShotLatch();
-
-        this.running = true;
     }
 
     @Override
@@ -130,28 +121,7 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
     @Override
     protected ExecutorService createShardConsumersThreadPool(String subtaskName) {
         // this is just a dummy fetcher, so no need to create a thread pool for shard consumers
-        ExecutorService mockExecutorService = mock(ExecutorService.class);
-        when(mockExecutorService.isTerminated())
-                .thenAnswer((InvocationOnMock invocation) -> !running);
-        when(mockExecutorService.shutdownNow())
-                .thenAnswer(
-                        invocationOnMock -> {
-                            executorServiceShutdownNowCalled = true;
-                            return Collections.emptyList();
-                        });
-        try {
-            when(mockExecutorService.awaitTermination(anyLong(), any()))
-                    .thenAnswer(invocationOnMock -> !running && executorServiceShutdownNowCalled);
-        } catch (InterruptedException e) {
-            // We're just trying to stub the method. Must acknowledge the checked exception.
-        }
-        return mockExecutorService;
-    }
-
-    @Override
-    public void awaitTermination() throws InterruptedException {
-        this.running = false;
-        super.awaitTermination();
+        return new TestExecutorService();
     }
 
     @Override
@@ -174,4 +144,74 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
     public void waitUntilDiscovery(int number) throws InterruptedException {
         discoveryWaiter.acquire(number);
     }
+
+    private static class TestExecutorService implements ExecutorService {
+        boolean terminated = false;
+
+        @Override
+        public void execute(Runnable command) {}
+
+        @Override
+        public void shutdown() {
+            terminated = true;
+        }
+
+        @Override
+        public List<Runnable> shutdownNow() {
+            terminated = true;
+            return Collections.emptyList();
+        }
+
+        @Override
+        public boolean isShutdown() {
+            return terminated;
+        }
+
+        @Override
+        public boolean isTerminated() {
+            return terminated;
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) {
+            return terminated;
+        }
+
+        @Override
+        public <T> Future<T> submit(Callable<T> task) {
+            return null;
+        }
+
+        @Override
+        public <T> Future<T> submit(Runnable task, T result) {
+            return null;
+        }
+
+        @Override
+        public Future<?> submit(Runnable task) {
+            return null;
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
+            return null;
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(
+                Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
+            return null;
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks) {
+            return null;
+        }
+
+        @Override
+        public <T> T invokeAny(
+                Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
+            return null;
+        }
+    }
 }