You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/12/12 19:27:37 UTC

[10/10] flink git commit: [FLINK-8239][tests] StreamTaskTestHarness supports 2-input head operators

[FLINK-8239][tests] StreamTaskTestHarness supports 2-input head operators

This closes #5153.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c1665c12
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c1665c12
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c1665c12

Branch: refs/heads/master
Commit: c1665c12b49752af2f4d2c624095dcec432efe8e
Parents: 6e89878
Author: zentol <ch...@apache.org>
Authored: Mon Dec 11 15:28:07 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Tue Dec 12 19:09:18 2017 +0100

----------------------------------------------------------------------
 .../runtime/tasks/StreamTaskTestHarness.java    |  7 +++
 .../tasks/StreamTaskTestHarnessTest.java        | 46 ++++++++++++++++++++
 2 files changed, 53 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c1665c12/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 2700a70..3c8dd0b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -36,6 +36,7 @@ import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
@@ -379,6 +380,12 @@ public class StreamTaskTestHarness<OUT> {
 		return new StreamConfigChainer(headOperatorId, headOperator, getStreamConfig());
 	}
 
+	public StreamConfigChainer setupOperatorChain(OperatorID headOperatorId, TwoInputStreamOperator<?, ?, ?> headOperator) {
+		Preconditions.checkState(!setupCalled, "This harness was already setup.");
+		setupCalled = true;
+		return new StreamConfigChainer(headOperatorId, headOperator, getStreamConfig());
+	}
+
 	// ------------------------------------------------------------------------
 
 	private class TaskThread extends Thread {

http://git-wip-us.apache.org/repos/asf/flink/blob/c1665c12/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java
index 9fba5f8..249a326 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import org.junit.Assert;
@@ -52,6 +53,12 @@ public class StreamTaskTestHarnessTest {
 		} catch (IllegalStateException expected) {
 			// expected
 		}
+		try {
+			harness.setupOperatorChain(new OperatorID(), new TwoInputTestOperator());
+			Assert.fail();
+		} catch (IllegalStateException expected) {
+			// expected
+		}
 
 		harness = new StreamTaskTestHarness<>(new OneInputStreamTask<>(), BasicTypeInfo.STRING_TYPE_INFO);
 		harness.setupOperatorChain(new OperatorID(), new TestOperator())
@@ -69,6 +76,35 @@ public class StreamTaskTestHarnessTest {
 		} catch (IllegalStateException expected) {
 			// expected
 		}
+		try {
+			harness.setupOperatorChain(new OperatorID(), new TwoInputTestOperator());
+			Assert.fail();
+		} catch (IllegalStateException expected) {
+			// expected
+		}
+
+		harness = new StreamTaskTestHarness<>(new TwoInputStreamTask<>(), BasicTypeInfo.STRING_TYPE_INFO);
+		harness.setupOperatorChain(new OperatorID(), new TwoInputTestOperator())
+			.chain(new OperatorID(), new TestOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()));
+
+		try {
+			harness.setupOutputForSingletonOperatorChain();
+			Assert.fail();
+		} catch (IllegalStateException expected) {
+			// expected
+		}
+		try {
+			harness.setupOperatorChain(new OperatorID(), new TestOperator());
+			Assert.fail();
+		} catch (IllegalStateException expected) {
+			// expected
+		}
+		try {
+			harness.setupOperatorChain(new OperatorID(), new TwoInputTestOperator());
+			Assert.fail();
+		} catch (IllegalStateException expected) {
+			// expected
+		}
 	}
 
 	private static class TestOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
@@ -76,4 +112,14 @@ public class StreamTaskTestHarnessTest {
 		public void processElement(StreamRecord<String> element) throws Exception {
 		}
 	}
+
+	private static class TwoInputTestOperator extends AbstractStreamOperator<String> implements TwoInputStreamOperator<String, String, String> {
+		@Override
+		public void processElement1(StreamRecord<String> element) throws Exception {
+		}
+
+		@Override
+		public void processElement2(StreamRecord<String> element) throws Exception {
+		}
+	}
 }