You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/12/12 19:27:36 UTC
[09/10] flink git commit: [FLINK-8238][tests] Forbid multiple setups
of StreamTaskTestHarness
[FLINK-8238][tests] Forbid multiple setups of StreamTaskTestHarness
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e898781
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e898781
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e898781
Branch: refs/heads/master
Commit: 6e89878166c0b8b8193ae69a675b6d085ffa9fe7
Parents: 5a545db
Author: zentol <ch...@apache.org>
Authored: Mon Dec 11 15:26:55 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Tue Dec 12 19:09:17 2017 +0100
----------------------------------------------------------------------
.../runtime/tasks/StreamTaskTestHarness.java | 7 ++
.../tasks/StreamTaskTestHarnessTest.java | 79 ++++++++++++++++++++
2 files changed, 86 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6e898781/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 1187d66..2700a70 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -39,6 +39,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.util.Preconditions;
import org.junit.Assert;
@@ -89,6 +90,8 @@ public class StreamTaskTestHarness<OUT> {
protected int numInputGates;
protected int numInputChannelsPerGate;
+ private boolean setupCalled = false;
+
@SuppressWarnings("rawtypes")
protected StreamTestSingleInputGate[] inputGates;
@@ -134,6 +137,8 @@ public class StreamTaskTestHarness<OUT> {
* please manually configure the stream config.
*/
public void setupOutputForSingletonOperatorChain() {
+ Preconditions.checkState(!setupCalled, "This harness was already setup.");
+ setupCalled = true;
streamConfig.setChainStart();
streamConfig.setBufferTimeout(0);
streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
@@ -369,6 +374,8 @@ public class StreamTaskTestHarness<OUT> {
}
public StreamConfigChainer setupOperatorChain(OperatorID headOperatorId, OneInputStreamOperator<?, ?> headOperator) {
+ Preconditions.checkState(!setupCalled, "This harness was already setup.");
+ setupCalled = true;
return new StreamConfigChainer(headOperatorId, headOperator, getStreamConfig());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e898781/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java
new file mode 100644
index 0000000..9fba5f8
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarnessTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for the {@link StreamTaskTestHarness}.
+ */
+public class StreamTaskTestHarnessTest {
+
+ @Test
+ public void testMultipleSetupsThrowsException() {
+ StreamTaskTestHarness<String> harness;
+
+ harness = new StreamTaskTestHarness<>(new OneInputStreamTask<>(), BasicTypeInfo.STRING_TYPE_INFO);
+ harness.setupOutputForSingletonOperatorChain();
+
+ try {
+ harness.setupOutputForSingletonOperatorChain();
+ Assert.fail();
+ } catch (IllegalStateException expected) {
+ // expected
+ }
+ try {
+ harness.setupOperatorChain(new OperatorID(), new TestOperator());
+ Assert.fail();
+ } catch (IllegalStateException expected) {
+ // expected
+ }
+
+ harness = new StreamTaskTestHarness<>(new OneInputStreamTask<>(), BasicTypeInfo.STRING_TYPE_INFO);
+ harness.setupOperatorChain(new OperatorID(), new TestOperator())
+ .chain(new OperatorID(), new TestOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()));
+
+ try {
+ harness.setupOutputForSingletonOperatorChain();
+ Assert.fail();
+ } catch (IllegalStateException expected) {
+ // expected
+ }
+ try {
+ harness.setupOperatorChain(new OperatorID(), new TestOperator());
+ Assert.fail();
+ } catch (IllegalStateException expected) {
+ // expected
+ }
+ }
+
+ private static class TestOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
+ @Override
+ public void processElement(StreamRecord<String> element) throws Exception {
+ }
+ }
+}