You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/02/11 14:57:38 UTC

[flink] 01/02: [hotfix][tests] Import static constant in SourceStreamTaskTest

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d342679a75e8886afbd9c763ca668be67ccc4385
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Feb 10 14:37:17 2021 +0100

    [hotfix][tests] Import static constant in SourceStreamTaskTest
---
 .../runtime/tasks/SourceStreamTaskTest.java        | 31 +++++++++++-----------
 1 file changed, 15 insertions(+), 16 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index d7cb5a7..9bcbf40 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -73,6 +73,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.junit.Assert.assertArrayEquals;
@@ -90,7 +91,7 @@ public class SourceStreamTaskTest {
     @Test
     public void testOpenClose() throws Exception {
         final StreamTaskTestHarness<String> testHarness =
-                new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+                new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
         testHarness.setupOutputForSingletonOperatorChain();
 
@@ -113,8 +114,7 @@ public class SourceStreamTaskTest {
     public void testStartDelayMetric() throws Exception {
         long sleepTime = 42;
         StreamTaskMailboxTestHarnessBuilder<String> builder =
-                new StreamTaskMailboxTestHarnessBuilder<>(
-                        SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+                new StreamTaskMailboxTestHarnessBuilder<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
         final Map<String, Metric> metrics = new ConcurrentHashMap<>();
         final TaskMetricGroup taskMetricGroup =
@@ -124,7 +124,7 @@ public class SourceStreamTaskTest {
                 builder.setupOutputForSingletonOperatorChain(
                                 new StreamSource<>(
                                         new CancelTestSource(
-                                                BasicTypeInfo.STRING_TYPE_INFO.createSerializer(
+                                                STRING_TYPE_INFO.createSerializer(
                                                         new ExecutionConfig()),
                                                 "Hello")))
                         .setTaskMetricGroup(taskMetricGroup)
@@ -227,7 +227,7 @@ public class SourceStreamTaskTest {
     @Test
     public void testClosingAllOperatorsOnChainProperly() throws Exception {
         final StreamTaskTestHarness<String> testHarness =
-                new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+                new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
         testHarness
                 .setupOperatorChain(
@@ -238,7 +238,7 @@ public class SourceStreamTaskTest {
                 .chain(
                         new OperatorID(),
                         new TestBoundedOneInputStreamOperator("Operator1"),
-                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
                 .finish();
 
         StreamConfig streamConfig = testHarness.getStreamConfig();
@@ -263,20 +263,19 @@ public class SourceStreamTaskTest {
     @Test
     public void testNotMarkingEndOfInputWhenTaskCancelled() throws Exception {
         final StreamTaskTestHarness<String> testHarness =
-                new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+                new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
         testHarness
                 .setupOperatorChain(
                         new OperatorID(),
                         new StreamSource<>(
                                 new CancelTestSource(
-                                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(
-                                                new ExecutionConfig()),
+                                        STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                         "Hello")))
                 .chain(
                         new OperatorID(),
                         new TestBoundedOneInputStreamOperator("Operator1"),
-                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
                 .finish();
 
         StreamConfig streamConfig = testHarness.getStreamConfig();
@@ -331,7 +330,7 @@ public class SourceStreamTaskTest {
     public void testCancellationWithSourceBlockedOnLock(
             boolean withPendingMail, boolean throwInCancel) throws Exception {
         final StreamTaskTestHarness<String> testHarness =
-                new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+                new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
         CancelLockingSource.reset();
         testHarness
@@ -341,7 +340,7 @@ public class SourceStreamTaskTest {
                 .chain(
                         new OperatorID(),
                         new TestBoundedOneInputStreamOperator("Operator1"),
-                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
                 .finish();
 
         StreamConfig streamConfig = testHarness.getStreamConfig();
@@ -444,7 +443,7 @@ public class SourceStreamTaskTest {
     private void testInterruptionExceptionNotSwallowed(
             InterruptedSource.ExceptionGenerator exceptionGenerator) throws Exception {
         final StreamTaskTestHarness<String> testHarness =
-                new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+                new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
         CancelLockingSource.reset();
         testHarness
@@ -454,7 +453,7 @@ public class SourceStreamTaskTest {
                 .chain(
                         new OperatorID(),
                         new TestBoundedOneInputStreamOperator("Operator1"),
-                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
                 .finish();
 
         StreamConfig streamConfig = testHarness.getStreamConfig();
@@ -498,7 +497,7 @@ public class SourceStreamTaskTest {
     @Test
     public void finishingIgnoresExceptions() throws Exception {
         final StreamTaskTestHarness<String> testHarness =
-                new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+                new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
         final CompletableFuture<Void> operatorRunningWaitingFuture = new CompletableFuture<>();
         ExceptionThrowingSource.setIsInRunLoopFuture(operatorRunningWaitingFuture);
@@ -518,7 +517,7 @@ public class SourceStreamTaskTest {
     @Test
     public void testWaitsForSourceThreadOnCancel() throws Exception {
         StreamTaskTestHarness<String> harness =
-                new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+                new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
         harness.setupOutputForSingletonOperatorChain();
         harness.getStreamConfig().setStreamOperator(new StreamSource<>(new NonStoppingSource()));