You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/12/12 19:27:36 UTC

[09/10] flink git commit: [FLINK-8238][tests] Forbid multiple setups of StreamTaskTestHarness

[FLINK-8238][tests] Forbid multiple setups of StreamTaskTestHarness


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e898781
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e898781
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e898781

Branch: refs/heads/master
Commit: 6e89878166c0b8b8193ae69a675b6d085ffa9fe7
Parents: 5a545db
Author: zentol <ch...@apache.org>
Authored: Mon Dec 11 15:26:55 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Tue Dec 12 19:09:17 2017 +0100

----------------------------------------------------------------------
 .../runtime/tasks/StreamTaskTestHarness.java    |  7 ++
 .../tasks/StreamTaskTestHarnessTest.java        | 79 ++++++++++++++++++++
 2 files changed, 86 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6e898781/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 1187d66..2700a70 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -39,6 +39,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.util.Preconditions;
 
 import org.junit.Assert;
 
@@ -89,6 +90,8 @@ public class StreamTaskTestHarness<OUT> {
 	protected int numInputGates;
 	protected int numInputChannelsPerGate;
 
+	private boolean setupCalled = false;
+
 	@SuppressWarnings("rawtypes")
 	protected StreamTestSingleInputGate[] inputGates;
 
@@ -134,6 +137,8 @@ public class StreamTaskTestHarness<OUT> {
 	 * please manually configure the stream config.
 	 */
 	public void setupOutputForSingletonOperatorChain() {
+		Preconditions.checkState(!setupCalled, "This harness was already setup.");
+		setupCalled = true;
 		streamConfig.setChainStart();
 		streamConfig.setBufferTimeout(0);
 		streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
@@ -369,6 +374,8 @@ public class StreamTaskTestHarness<OUT> {
 	}
 
 	public StreamConfigChainer setupOperatorChain(OperatorID headOperatorId, OneInputStreamOperator<?, ?> headOperator) {
+		Preconditions.checkState(!setupCalled, "This harness was already setup.");
+		setupCalled = true;
 		return new StreamConfigChainer(headOperatorId, headOperator, getStreamConfig());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e898781/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java
new file mode 100644
index 0000000..9fba5f8
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for the {@link StreamTaskTestHarness}.
+ */
+public class StreamTaskTestHarnessTest {
+
+	@Test
+	public void testMultipleSetupsThrowsException() {
+		StreamTaskTestHarness<String> harness;
+
+		harness = new StreamTaskTestHarness<>(new OneInputStreamTask<>(), BasicTypeInfo.STRING_TYPE_INFO);
+		harness.setupOutputForSingletonOperatorChain();
+
+		try {
+			harness.setupOutputForSingletonOperatorChain();
+			Assert.fail();
+		} catch (IllegalStateException expected) {
+			// expected
+		}
+		try {
+			harness.setupOperatorChain(new OperatorID(), new TestOperator());
+			Assert.fail();
+		} catch (IllegalStateException expected) {
+			// expected
+		}
+
+		harness = new StreamTaskTestHarness<>(new OneInputStreamTask<>(), BasicTypeInfo.STRING_TYPE_INFO);
+		harness.setupOperatorChain(new OperatorID(), new TestOperator())
+			.chain(new OperatorID(), new TestOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()));
+
+		try {
+			harness.setupOutputForSingletonOperatorChain();
+			Assert.fail();
+		} catch (IllegalStateException expected) {
+			// expected
+		}
+		try {
+			harness.setupOperatorChain(new OperatorID(), new TestOperator());
+			Assert.fail();
+		} catch (IllegalStateException expected) {
+			// expected
+		}
+	}
+
+	private static class TestOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
+		@Override
+		public void processElement(StreamRecord<String> element) throws Exception {
+		}
+	}
+}