You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/15 20:13:54 UTC
[flink] 06/11: [hotfix][tests] Move constants in SplitFetcherTest
relevant to only one test into test method
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3b2f54bcb437f98e6137c904045cc51072b5c06b
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 14 19:47:40 2020 +0200
[hotfix][tests] Move constants in SplitFetcherTest relevant to only one test into test method
---
.../source/reader/fetcher/SplitFetcherTest.java | 27 +++++++++++-----------
1 file changed, 14 insertions(+), 13 deletions(-)
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index eef8328..4fa99dd 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -41,13 +41,14 @@ import static org.junit.Assert.assertTrue;
* Unit test for {@link SplitFetcher}.
*/
public class SplitFetcherTest {
- private static final int NUM_SPLITS = 3;
- private static final int NUM_RECORDS_PER_SPLIT = 10_000;
- private static final int INTERRUPT_RECORDS_INTERVAL = 10;
- private static final int NUM_TOTAL_RECORDS = NUM_RECORDS_PER_SPLIT * NUM_SPLITS;
@Test
public void testWakeup() throws InterruptedException {
+ final int numSplits = 3;
+ final int numRecordsPerSplit = 10_000;
+ final int interruptRecordsInterval = 10;
+ final int numTotalRecords = numRecordsPerSplit * numSplits;
+
FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementQueue =
new FutureCompletingBlockingQueue<>(new FutureNotifier(), 1);
SplitFetcher<int[], MockSourceSplit> fetcher =
@@ -59,10 +60,10 @@ public class SplitFetcherTest {
// Prepare the splits.
List<MockSourceSplit> splits = new ArrayList<>();
- for (int i = 0; i < NUM_SPLITS; i++) {
- splits.add(new MockSourceSplit(i, 0, NUM_RECORDS_PER_SPLIT));
- int base = i * NUM_RECORDS_PER_SPLIT;
- for (int j = base; j < base + NUM_RECORDS_PER_SPLIT; j++) {
+ for (int i = 0; i < numSplits; i++) {
+ splits.add(new MockSourceSplit(i, 0, numRecordsPerSplit));
+ int base = i * numRecordsPerSplit;
+ for (int j = base; j < base + numRecordsPerSplit; j++) {
splits.get(splits.size() - 1).addRecord(j);
}
}
@@ -81,9 +82,9 @@ public class SplitFetcherTest {
@Override
public void run() {
int lastInterrupt = 0;
- while (recordsRead.size() < NUM_TOTAL_RECORDS && !stop.get()) {
+ while (recordsRead.size() < numTotalRecords && !stop.get()) {
int numRecordsRead = recordsRead.size();
- if (numRecordsRead >= lastInterrupt + INTERRUPT_RECORDS_INTERVAL) {
+ if (numRecordsRead >= lastInterrupt + interruptRecordsInterval) {
fetcher.wakeUp(false);
wakeupTimes.incrementAndGet();
lastInterrupt = numRecordsRead;
@@ -96,7 +97,7 @@ public class SplitFetcherTest {
fetcherThread.start();
interrupter.start();
- while (recordsRead.size() < NUM_SPLITS * NUM_RECORDS_PER_SPLIT) {
+ while (recordsRead.size() < numSplits * numRecordsPerSplit) {
final RecordsWithSplitIds<int[]> nextBatch = elementQueue.take();
while (nextBatch.nextSplit() != null) {
int[] arr;
@@ -106,9 +107,9 @@ public class SplitFetcherTest {
}
}
- assertEquals(NUM_TOTAL_RECORDS, recordsRead.size());
+ assertEquals(numTotalRecords, recordsRead.size());
assertEquals(0, (int) recordsRead.first());
- assertEquals(NUM_TOTAL_RECORDS - 1, (int) recordsRead.last());
+ assertEquals(numTotalRecords - 1, (int) recordsRead.last());
assertTrue(wakeupTimes.get() > 0);
} finally {
stop.set(true);