You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/03/25 14:07:09 UTC

[flink] 04/11: [hotfix][test] Fix formatting in AbstractStreamOperatorTest

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 63d6add140a3146fe4034515b458fe9bd6a83b97
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Wed Mar 11 09:34:47 2020 +0100

    [hotfix][test] Fix formatting in AbstractStreamOperatorTest
---
 .../api/operators/AbstractStreamOperatorTest.java  | 109 ++++++++++-----------
 1 file changed, 54 insertions(+), 55 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index f9d9aa5..93dca8f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -84,13 +84,12 @@ import static org.powermock.api.mockito.PowerMockito.whenNew;
 @PrepareForTest(AbstractStreamOperator.class)
 @PowerMockIgnore({"java.*", "javax.*", "org.slf4j.*", "org.apache.log4j.*"})
 public class AbstractStreamOperatorTest {
-
 	@Test
 	public void testStateDoesNotInterfere() throws Exception {
 		TestOperator testOperator = new TestOperator();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+			new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
 
 		testHarness.open();
 
@@ -101,8 +100,8 @@ public class AbstractStreamOperatorTest {
 		testHarness.processElement(new Tuple2<>(0, "EMIT_STATE"), 0);
 
 		assertThat(
-				extractResult(testHarness),
-				contains("ON_ELEMENT:1:CIAO", "ON_ELEMENT:0:HELLO"));
+			extractResult(testHarness),
+			contains("ON_ELEMENT:1:CIAO", "ON_ELEMENT:0:HELLO"));
 	}
 
 	/**
@@ -114,7 +113,7 @@ public class AbstractStreamOperatorTest {
 		TestOperator testOperator = new TestOperator();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+			new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
 
 		testHarness.open();
 
@@ -130,14 +129,14 @@ public class AbstractStreamOperatorTest {
 		testHarness.processWatermark(10L);
 
 		assertThat(
-				extractResult(testHarness),
-				contains("ON_EVENT_TIME:HELLO"));
+			extractResult(testHarness),
+			contains("ON_EVENT_TIME:HELLO"));
 
 		testHarness.processWatermark(20L);
 
 		assertThat(
-				extractResult(testHarness),
-				contains("ON_EVENT_TIME:CIAO"));
+			extractResult(testHarness),
+			contains("ON_EVENT_TIME:CIAO"));
 	}
 
 	/**
@@ -149,7 +148,7 @@ public class AbstractStreamOperatorTest {
 		TestOperator testOperator = new TestOperator();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+			new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
 
 		testHarness.open();
 
@@ -165,14 +164,14 @@ public class AbstractStreamOperatorTest {
 		testHarness.setProcessingTime(10L);
 
 		assertThat(
-				extractResult(testHarness),
-				contains("ON_PROC_TIME:HELLO"));
+			extractResult(testHarness),
+			contains("ON_PROC_TIME:HELLO"));
 
 		testHarness.setProcessingTime(20L);
 
 		assertThat(
-				extractResult(testHarness),
-				contains("ON_PROC_TIME:CIAO"));
+			extractResult(testHarness),
+			contains("ON_PROC_TIME:CIAO"));
 	}
 
 	/**
@@ -183,7 +182,7 @@ public class AbstractStreamOperatorTest {
 		TestOperator testOperator = new TestOperator();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+			new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
 
 		testHarness.open();
 
@@ -201,10 +200,10 @@ public class AbstractStreamOperatorTest {
 		TestOperator testOperator1 = new TestOperator();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness1 =
-				new KeyedOneInputStreamOperatorTestHarness<>(
-						testOperator1,
-						new TestKeySelector(),
-						BasicTypeInfo.INT_TYPE_INFO);
+			new KeyedOneInputStreamOperatorTestHarness<>(
+				testOperator1,
+				new TestKeySelector(),
+				BasicTypeInfo.INT_TYPE_INFO);
 
 		testHarness1.setProcessingTime(0L);
 
@@ -215,14 +214,14 @@ public class AbstractStreamOperatorTest {
 		testHarness1.setProcessingTime(10L);
 
 		assertThat(
-				extractResult(testHarness1),
-				contains("ON_PROC_TIME:HELLO"));
+			extractResult(testHarness1),
+			contains("ON_PROC_TIME:HELLO"));
 
 		testHarness1.setProcessingTime(20L);
 
 		assertThat(
-				extractResult(testHarness1),
-				contains("ON_PROC_TIME:CIAO"));
+			extractResult(testHarness1),
+			contains("ON_PROC_TIME:CIAO"));
 	}
 
 
@@ -234,7 +233,7 @@ public class AbstractStreamOperatorTest {
 		TestOperator testOperator = new TestOperator();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+			new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
 
 		testHarness.open();
 
@@ -249,14 +248,14 @@ public class AbstractStreamOperatorTest {
 		testHarness.processWatermark(20L);
 
 		assertThat(
-				extractResult(testHarness),
-				contains("ON_EVENT_TIME:HELLO"));
+			extractResult(testHarness),
+			contains("ON_EVENT_TIME:HELLO"));
 
 		testHarness.setProcessingTime(10L);
 
 		assertThat(
-				extractResult(testHarness),
-				contains("ON_PROC_TIME:HELLO"));
+			extractResult(testHarness),
+			contains("ON_PROC_TIME:HELLO"));
 	}
 
 	/**
@@ -281,13 +280,13 @@ public class AbstractStreamOperatorTest {
 		TestOperator testOperator = new TestOperator();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(
-						testOperator,
-						new TestKeySelector(),
-						BasicTypeInfo.INT_TYPE_INFO,
-						maxParallelism,
-						1, /* num subtasks */
-						0 /* subtask index */);
+			new KeyedOneInputStreamOperatorTestHarness<>(
+				testOperator,
+				new TestKeySelector(),
+				BasicTypeInfo.INT_TYPE_INFO,
+				maxParallelism,
+				1, /* num subtasks */
+				0 /* subtask index */);
 
 		testHarness.open();
 
@@ -314,13 +313,13 @@ public class AbstractStreamOperatorTest {
 		TestOperator testOperator1 = new TestOperator();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness1 =
-				new KeyedOneInputStreamOperatorTestHarness<>(
-						testOperator1,
-						new TestKeySelector(),
-						BasicTypeInfo.INT_TYPE_INFO,
-						maxParallelism,
-						2, /* num subtasks */
-						0 /* subtask index */);
+			new KeyedOneInputStreamOperatorTestHarness<>(
+				testOperator1,
+				new TestKeySelector(),
+				BasicTypeInfo.INT_TYPE_INFO,
+				maxParallelism,
+				2, /* num subtasks */
+				0 /* subtask index */);
 
 		testHarness1.setup();
 		testHarness1.initializeState(initState1);
@@ -357,13 +356,13 @@ public class AbstractStreamOperatorTest {
 		TestOperator testOperator2 = new TestOperator();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness2 =
-				new KeyedOneInputStreamOperatorTestHarness<>(
-						testOperator2,
-						new TestKeySelector(),
-						BasicTypeInfo.INT_TYPE_INFO,
-						maxParallelism,
-						2, /* num subtasks */
-						1 /* subtask index */);
+			new KeyedOneInputStreamOperatorTestHarness<>(
+				testOperator2,
+				new TestKeySelector(),
+				BasicTypeInfo.INT_TYPE_INFO,
+				maxParallelism,
+				2, /* num subtasks */
+				1 /* subtask index */);
 
 		testHarness2.setup();
 		testHarness2.initializeState(initState2);
@@ -687,24 +686,24 @@ public class AbstractStreamOperatorTest {
 	 * state or setting timers.
 	 */
 	private static class TestOperator
-			extends AbstractStreamOperator<String>
-			implements OneInputStreamOperator<Tuple2<Integer, String>, String>, Triggerable<Integer, VoidNamespace> {
+		extends AbstractStreamOperator<String>
+		implements OneInputStreamOperator<Tuple2<Integer, String>, String>, Triggerable<Integer, VoidNamespace> {
 
 		private static final long serialVersionUID = 1L;
 
 		private transient InternalTimerService<VoidNamespace> timerService;
 
 		private final ValueStateDescriptor<String> stateDescriptor =
-				new ValueStateDescriptor<>("state", StringSerializer.INSTANCE);
+			new ValueStateDescriptor<>("state", StringSerializer.INSTANCE);
 
 		@Override
 		public void open() throws Exception {
 			super.open();
 
 			this.timerService = getInternalTimerService(
-					"test-timers",
-					VoidNamespaceSerializer.INSTANCE,
-					this);
+				"test-timers",
+				VoidNamespaceSerializer.INSTANCE,
+				this);
 		}
 
 		@Override