You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/12/12 19:27:37 UTC
[10/10] flink git commit: [FLINK-8239][tests] StreamTaskTestHarness
supports 2-input head operators
[FLINK-8239][tests] StreamTaskTestHarness supports 2-input head operators
This closes #5153.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c1665c12
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c1665c12
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c1665c12
Branch: refs/heads/master
Commit: c1665c12b49752af2f4d2c624095dcec432efe8e
Parents: 6e89878
Author: zentol <ch...@apache.org>
Authored: Mon Dec 11 15:28:07 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Tue Dec 12 19:09:18 2017 +0100
----------------------------------------------------------------------
.../runtime/tasks/StreamTaskTestHarness.java | 7 +++
.../tasks/StreamTaskTestHarnessTest.java | 46 ++++++++++++++++++++
2 files changed, 53 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c1665c12/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 2700a70..3c8dd0b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -36,6 +36,7 @@ import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
@@ -379,6 +380,12 @@ public class StreamTaskTestHarness<OUT> {
return new StreamConfigChainer(headOperatorId, headOperator, getStreamConfig());
}
+ public StreamConfigChainer setupOperatorChain(OperatorID headOperatorId, TwoInputStreamOperator<?, ?, ?> headOperator) {
+ Preconditions.checkState(!setupCalled, "This harness was already setup.");
+ setupCalled = true;
+ return new StreamConfigChainer(headOperatorId, headOperator, getStreamConfig());
+ }
+
// ------------------------------------------------------------------------
private class TaskThread extends Thread {
http://git-wip-us.apache.org/repos/asf/flink/blob/c1665c12/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java
index 9fba5f8..249a326 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.junit.Assert;
@@ -52,6 +53,12 @@ public class StreamTaskTestHarnessTest {
} catch (IllegalStateException expected) {
// expected
}
+ try {
+ harness.setupOperatorChain(new OperatorID(), new TwoInputTestOperator());
+ Assert.fail();
+ } catch (IllegalStateException expected) {
+ // expected
+ }
harness = new StreamTaskTestHarness<>(new OneInputStreamTask<>(), BasicTypeInfo.STRING_TYPE_INFO);
harness.setupOperatorChain(new OperatorID(), new TestOperator())
@@ -69,6 +76,35 @@ public class StreamTaskTestHarnessTest {
} catch (IllegalStateException expected) {
// expected
}
+ try {
+ harness.setupOperatorChain(new OperatorID(), new TwoInputTestOperator());
+ Assert.fail();
+ } catch (IllegalStateException expected) {
+ // expected
+ }
+
+ harness = new StreamTaskTestHarness<>(new TwoInputStreamTask<>(), BasicTypeInfo.STRING_TYPE_INFO);
+ harness.setupOperatorChain(new OperatorID(), new TwoInputTestOperator())
+ .chain(new OperatorID(), new TestOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()));
+
+ try {
+ harness.setupOutputForSingletonOperatorChain();
+ Assert.fail();
+ } catch (IllegalStateException expected) {
+ // expected
+ }
+ try {
+ harness.setupOperatorChain(new OperatorID(), new TestOperator());
+ Assert.fail();
+ } catch (IllegalStateException expected) {
+ // expected
+ }
+ try {
+ harness.setupOperatorChain(new OperatorID(), new TwoInputTestOperator());
+ Assert.fail();
+ } catch (IllegalStateException expected) {
+ // expected
+ }
}
private static class TestOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
@@ -76,4 +112,14 @@ public class StreamTaskTestHarnessTest {
public void processElement(StreamRecord<String> element) throws Exception {
}
}
+
+ private static class TwoInputTestOperator extends AbstractStreamOperator<String> implements TwoInputStreamOperator<String, String, String> {
+ @Override
+ public void processElement1(StreamRecord<String> element) throws Exception {
+ }
+
+ @Override
+ public void processElement2(StreamRecord<String> element) throws Exception {
+ }
+ }
}