You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/03/25 14:07:09 UTC
[flink] 04/11: [hotfix][test] Fix formatting in
AbstractStreamOperatorTest
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 63d6add140a3146fe4034515b458fe9bd6a83b97
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Wed Mar 11 09:34:47 2020 +0100
[hotfix][test] Fix formatting in AbstractStreamOperatorTest
---
.../api/operators/AbstractStreamOperatorTest.java | 109 ++++++++++-----------
1 file changed, 54 insertions(+), 55 deletions(-)
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index f9d9aa5..93dca8f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -84,13 +84,12 @@ import static org.powermock.api.mockito.PowerMockito.whenNew;
@PrepareForTest(AbstractStreamOperator.class)
@PowerMockIgnore({"java.*", "javax.*", "org.slf4j.*", "org.apache.log4j.*"})
public class AbstractStreamOperatorTest {
-
@Test
public void testStateDoesNotInterfere() throws Exception {
TestOperator testOperator = new TestOperator();
KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+ new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
testHarness.open();
@@ -101,8 +100,8 @@ public class AbstractStreamOperatorTest {
testHarness.processElement(new Tuple2<>(0, "EMIT_STATE"), 0);
assertThat(
- extractResult(testHarness),
- contains("ON_ELEMENT:1:CIAO", "ON_ELEMENT:0:HELLO"));
+ extractResult(testHarness),
+ contains("ON_ELEMENT:1:CIAO", "ON_ELEMENT:0:HELLO"));
}
/**
@@ -114,7 +113,7 @@ public class AbstractStreamOperatorTest {
TestOperator testOperator = new TestOperator();
KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+ new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
testHarness.open();
@@ -130,14 +129,14 @@ public class AbstractStreamOperatorTest {
testHarness.processWatermark(10L);
assertThat(
- extractResult(testHarness),
- contains("ON_EVENT_TIME:HELLO"));
+ extractResult(testHarness),
+ contains("ON_EVENT_TIME:HELLO"));
testHarness.processWatermark(20L);
assertThat(
- extractResult(testHarness),
- contains("ON_EVENT_TIME:CIAO"));
+ extractResult(testHarness),
+ contains("ON_EVENT_TIME:CIAO"));
}
/**
@@ -149,7 +148,7 @@ public class AbstractStreamOperatorTest {
TestOperator testOperator = new TestOperator();
KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+ new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
testHarness.open();
@@ -165,14 +164,14 @@ public class AbstractStreamOperatorTest {
testHarness.setProcessingTime(10L);
assertThat(
- extractResult(testHarness),
- contains("ON_PROC_TIME:HELLO"));
+ extractResult(testHarness),
+ contains("ON_PROC_TIME:HELLO"));
testHarness.setProcessingTime(20L);
assertThat(
- extractResult(testHarness),
- contains("ON_PROC_TIME:CIAO"));
+ extractResult(testHarness),
+ contains("ON_PROC_TIME:CIAO"));
}
/**
@@ -183,7 +182,7 @@ public class AbstractStreamOperatorTest {
TestOperator testOperator = new TestOperator();
KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+ new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
testHarness.open();
@@ -201,10 +200,10 @@ public class AbstractStreamOperatorTest {
TestOperator testOperator1 = new TestOperator();
KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness1 =
- new KeyedOneInputStreamOperatorTestHarness<>(
- testOperator1,
- new TestKeySelector(),
- BasicTypeInfo.INT_TYPE_INFO);
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ testOperator1,
+ new TestKeySelector(),
+ BasicTypeInfo.INT_TYPE_INFO);
testHarness1.setProcessingTime(0L);
@@ -215,14 +214,14 @@ public class AbstractStreamOperatorTest {
testHarness1.setProcessingTime(10L);
assertThat(
- extractResult(testHarness1),
- contains("ON_PROC_TIME:HELLO"));
+ extractResult(testHarness1),
+ contains("ON_PROC_TIME:HELLO"));
testHarness1.setProcessingTime(20L);
assertThat(
- extractResult(testHarness1),
- contains("ON_PROC_TIME:CIAO"));
+ extractResult(testHarness1),
+ contains("ON_PROC_TIME:CIAO"));
}
@@ -234,7 +233,7 @@ public class AbstractStreamOperatorTest {
TestOperator testOperator = new TestOperator();
KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+ new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
testHarness.open();
@@ -249,14 +248,14 @@ public class AbstractStreamOperatorTest {
testHarness.processWatermark(20L);
assertThat(
- extractResult(testHarness),
- contains("ON_EVENT_TIME:HELLO"));
+ extractResult(testHarness),
+ contains("ON_EVENT_TIME:HELLO"));
testHarness.setProcessingTime(10L);
assertThat(
- extractResult(testHarness),
- contains("ON_PROC_TIME:HELLO"));
+ extractResult(testHarness),
+ contains("ON_PROC_TIME:HELLO"));
}
/**
@@ -281,13 +280,13 @@ public class AbstractStreamOperatorTest {
TestOperator testOperator = new TestOperator();
KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(
- testOperator,
- new TestKeySelector(),
- BasicTypeInfo.INT_TYPE_INFO,
- maxParallelism,
- 1, /* num subtasks */
- 0 /* subtask index */);
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ testOperator,
+ new TestKeySelector(),
+ BasicTypeInfo.INT_TYPE_INFO,
+ maxParallelism,
+ 1, /* num subtasks */
+ 0 /* subtask index */);
testHarness.open();
@@ -314,13 +313,13 @@ public class AbstractStreamOperatorTest {
TestOperator testOperator1 = new TestOperator();
KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness1 =
- new KeyedOneInputStreamOperatorTestHarness<>(
- testOperator1,
- new TestKeySelector(),
- BasicTypeInfo.INT_TYPE_INFO,
- maxParallelism,
- 2, /* num subtasks */
- 0 /* subtask index */);
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ testOperator1,
+ new TestKeySelector(),
+ BasicTypeInfo.INT_TYPE_INFO,
+ maxParallelism,
+ 2, /* num subtasks */
+ 0 /* subtask index */);
testHarness1.setup();
testHarness1.initializeState(initState1);
@@ -357,13 +356,13 @@ public class AbstractStreamOperatorTest {
TestOperator testOperator2 = new TestOperator();
KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness2 =
- new KeyedOneInputStreamOperatorTestHarness<>(
- testOperator2,
- new TestKeySelector(),
- BasicTypeInfo.INT_TYPE_INFO,
- maxParallelism,
- 2, /* num subtasks */
- 1 /* subtask index */);
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ testOperator2,
+ new TestKeySelector(),
+ BasicTypeInfo.INT_TYPE_INFO,
+ maxParallelism,
+ 2, /* num subtasks */
+ 1 /* subtask index */);
testHarness2.setup();
testHarness2.initializeState(initState2);
@@ -687,24 +686,24 @@ public class AbstractStreamOperatorTest {
* state or setting timers.
*/
private static class TestOperator
- extends AbstractStreamOperator<String>
- implements OneInputStreamOperator<Tuple2<Integer, String>, String>, Triggerable<Integer, VoidNamespace> {
+ extends AbstractStreamOperator<String>
+ implements OneInputStreamOperator<Tuple2<Integer, String>, String>, Triggerable<Integer, VoidNamespace> {
private static final long serialVersionUID = 1L;
private transient InternalTimerService<VoidNamespace> timerService;
private final ValueStateDescriptor<String> stateDescriptor =
- new ValueStateDescriptor<>("state", StringSerializer.INSTANCE);
+ new ValueStateDescriptor<>("state", StringSerializer.INSTANCE);
@Override
public void open() throws Exception {
super.open();
this.timerService = getInternalTimerService(
- "test-timers",
- VoidNamespaceSerializer.INSTANCE,
- this);
+ "test-timers",
+ VoidNamespaceSerializer.INSTANCE,
+ this);
}
@Override