You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/02/11 14:57:38 UTC
[flink] 01/02: [hotfix][tests] Import static constant in
SourceStreamTaskTest
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit d342679a75e8886afbd9c763ca668be67ccc4385
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Feb 10 14:37:17 2021 +0100
[hotfix][tests] Import static constant in SourceStreamTaskTest
---
.../runtime/tasks/SourceStreamTaskTest.java | 31 +++++++++++-----------
1 file changed, 15 insertions(+), 16 deletions(-)
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index d7cb5a7..9bcbf40 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -73,6 +73,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
import static org.apache.flink.util.Preconditions.checkState;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertArrayEquals;
@@ -90,7 +91,7 @@ public class SourceStreamTaskTest {
@Test
public void testOpenClose() throws Exception {
final StreamTaskTestHarness<String> testHarness =
- new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+ new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
testHarness.setupOutputForSingletonOperatorChain();
@@ -113,8 +114,7 @@ public class SourceStreamTaskTest {
public void testStartDelayMetric() throws Exception {
long sleepTime = 42;
StreamTaskMailboxTestHarnessBuilder<String> builder =
- new StreamTaskMailboxTestHarnessBuilder<>(
- SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+ new StreamTaskMailboxTestHarnessBuilder<>(SourceStreamTask::new, STRING_TYPE_INFO);
final Map<String, Metric> metrics = new ConcurrentHashMap<>();
final TaskMetricGroup taskMetricGroup =
@@ -124,7 +124,7 @@ public class SourceStreamTaskTest {
builder.setupOutputForSingletonOperatorChain(
new StreamSource<>(
new CancelTestSource(
- BasicTypeInfo.STRING_TYPE_INFO.createSerializer(
+ STRING_TYPE_INFO.createSerializer(
new ExecutionConfig()),
"Hello")))
.setTaskMetricGroup(taskMetricGroup)
@@ -227,7 +227,7 @@ public class SourceStreamTaskTest {
@Test
public void testClosingAllOperatorsOnChainProperly() throws Exception {
final StreamTaskTestHarness<String> testHarness =
- new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+ new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
testHarness
.setupOperatorChain(
@@ -238,7 +238,7 @@ public class SourceStreamTaskTest {
.chain(
new OperatorID(),
new TestBoundedOneInputStreamOperator("Operator1"),
- BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
+ STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
.finish();
StreamConfig streamConfig = testHarness.getStreamConfig();
@@ -263,20 +263,19 @@ public class SourceStreamTaskTest {
@Test
public void testNotMarkingEndOfInputWhenTaskCancelled() throws Exception {
final StreamTaskTestHarness<String> testHarness =
- new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+ new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
testHarness
.setupOperatorChain(
new OperatorID(),
new StreamSource<>(
new CancelTestSource(
- BasicTypeInfo.STRING_TYPE_INFO.createSerializer(
- new ExecutionConfig()),
+ STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
"Hello")))
.chain(
new OperatorID(),
new TestBoundedOneInputStreamOperator("Operator1"),
- BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
+ STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
.finish();
StreamConfig streamConfig = testHarness.getStreamConfig();
@@ -331,7 +330,7 @@ public class SourceStreamTaskTest {
public void testCancellationWithSourceBlockedOnLock(
boolean withPendingMail, boolean throwInCancel) throws Exception {
final StreamTaskTestHarness<String> testHarness =
- new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+ new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
CancelLockingSource.reset();
testHarness
@@ -341,7 +340,7 @@ public class SourceStreamTaskTest {
.chain(
new OperatorID(),
new TestBoundedOneInputStreamOperator("Operator1"),
- BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
+ STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
.finish();
StreamConfig streamConfig = testHarness.getStreamConfig();
@@ -444,7 +443,7 @@ public class SourceStreamTaskTest {
private void testInterruptionExceptionNotSwallowed(
InterruptedSource.ExceptionGenerator exceptionGenerator) throws Exception {
final StreamTaskTestHarness<String> testHarness =
- new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+ new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
CancelLockingSource.reset();
testHarness
@@ -454,7 +453,7 @@ public class SourceStreamTaskTest {
.chain(
new OperatorID(),
new TestBoundedOneInputStreamOperator("Operator1"),
- BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
+ STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
.finish();
StreamConfig streamConfig = testHarness.getStreamConfig();
@@ -498,7 +497,7 @@ public class SourceStreamTaskTest {
@Test
public void finishingIgnoresExceptions() throws Exception {
final StreamTaskTestHarness<String> testHarness =
- new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+ new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
final CompletableFuture<Void> operatorRunningWaitingFuture = new CompletableFuture<>();
ExceptionThrowingSource.setIsInRunLoopFuture(operatorRunningWaitingFuture);
@@ -518,7 +517,7 @@ public class SourceStreamTaskTest {
@Test
public void testWaitsForSourceThreadOnCancel() throws Exception {
StreamTaskTestHarness<String> harness =
- new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+ new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
harness.setupOutputForSingletonOperatorChain();
harness.getStreamConfig().setStreamOperator(new StreamSource<>(new NonStoppingSource()));