You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/15 20:13:54 UTC

[flink] 06/11: [hotfix][tests] Move constants in SplitFetcherTest relevant to only one test into test method

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3b2f54bcb437f98e6137c904045cc51072b5c06b
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 14 19:47:40 2020 +0200

    [hotfix][tests] Move constants in SplitFetcherTest relevant to only one test into test method
---
 .../source/reader/fetcher/SplitFetcherTest.java    | 27 +++++++++++-----------
 1 file changed, 14 insertions(+), 13 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index eef8328..4fa99dd 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -41,13 +41,14 @@ import static org.junit.Assert.assertTrue;
  * Unit test for {@link SplitFetcher}.
  */
 public class SplitFetcherTest {
-	private static final int NUM_SPLITS = 3;
-	private static final int NUM_RECORDS_PER_SPLIT = 10_000;
-	private static final int INTERRUPT_RECORDS_INTERVAL = 10;
-	private static final int NUM_TOTAL_RECORDS = NUM_RECORDS_PER_SPLIT * NUM_SPLITS;
 
 	@Test
 	public void testWakeup() throws InterruptedException {
+		final int numSplits = 3;
+		final int numRecordsPerSplit = 10_000;
+		final int interruptRecordsInterval = 10;
+		final int numTotalRecords = numRecordsPerSplit * numSplits;
+
 		FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementQueue =
 			new FutureCompletingBlockingQueue<>(new FutureNotifier(), 1);
 		SplitFetcher<int[], MockSourceSplit> fetcher =
@@ -59,10 +60,10 @@ public class SplitFetcherTest {
 
 		// Prepare the splits.
 		List<MockSourceSplit> splits = new ArrayList<>();
-		for (int i = 0; i < NUM_SPLITS; i++) {
-			splits.add(new MockSourceSplit(i, 0, NUM_RECORDS_PER_SPLIT));
-			int base = i * NUM_RECORDS_PER_SPLIT;
-			for (int j = base; j < base + NUM_RECORDS_PER_SPLIT; j++) {
+		for (int i = 0; i < numSplits; i++) {
+			splits.add(new MockSourceSplit(i, 0, numRecordsPerSplit));
+			int base = i * numRecordsPerSplit;
+			for (int j = base; j < base + numRecordsPerSplit; j++) {
 				splits.get(splits.size() - 1).addRecord(j);
 			}
 		}
@@ -81,9 +82,9 @@ public class SplitFetcherTest {
 			@Override
 			public void run() {
 				int lastInterrupt = 0;
-				while (recordsRead.size() < NUM_TOTAL_RECORDS && !stop.get()) {
+				while (recordsRead.size() < numTotalRecords && !stop.get()) {
 					int numRecordsRead = recordsRead.size();
-					if (numRecordsRead >= lastInterrupt + INTERRUPT_RECORDS_INTERVAL) {
+					if (numRecordsRead >= lastInterrupt + interruptRecordsInterval) {
 						fetcher.wakeUp(false);
 						wakeupTimes.incrementAndGet();
 						lastInterrupt = numRecordsRead;
@@ -96,7 +97,7 @@ public class SplitFetcherTest {
 			fetcherThread.start();
 			interrupter.start();
 
-			while (recordsRead.size() < NUM_SPLITS * NUM_RECORDS_PER_SPLIT) {
+			while (recordsRead.size() < numSplits * numRecordsPerSplit) {
 				final RecordsWithSplitIds<int[]> nextBatch = elementQueue.take();
 				while (nextBatch.nextSplit() != null) {
 					int[] arr;
@@ -106,9 +107,9 @@ public class SplitFetcherTest {
 				}
 			}
 
-			assertEquals(NUM_TOTAL_RECORDS, recordsRead.size());
+			assertEquals(numTotalRecords, recordsRead.size());
 			assertEquals(0, (int) recordsRead.first());
-			assertEquals(NUM_TOTAL_RECORDS - 1, (int) recordsRead.last());
+			assertEquals(numTotalRecords - 1, (int) recordsRead.last());
 			assertTrue(wakeupTimes.get() > 0);
 		} finally {
 			stop.set(true);