You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/08/19 18:42:11 UTC
[2/6] flink git commit: [FLINK-2398][api-breaking] Introduce
StreamGraphGenerator
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 0fad3dd..285ee57 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -1,27 +1,22 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
package org.apache.flink.streaming.api;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -31,146 +26,235 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.IterativeDataStream;
import org.apache.flink.streaming.api.datastream.IterativeDataStream.ConnectedIterativeDataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.StreamLoop;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
+import org.apache.flink.streaming.util.EvenOddOutputSelector;
+import org.apache.flink.streaming.util.NoOpIntMap;
+import org.apache.flink.streaming.util.ReceiveCheckNoOpSink;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.Test;
+import static org.junit.Assert.*;
+
@SuppressWarnings({ "unchecked", "unused", "serial" })
public class IterateTest extends StreamingMultipleProgramsTestBase {
- private static final long MEMORYSIZE = 32;
private static boolean iterated[];
- private static int PARALLELISM = 2;
- @Test
- public void testException() throws Exception {
+ @Test(expected = UnsupportedOperationException.class)
+ public void testIncorrectParallelism() throws Exception {
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
DataStream<Integer> source = env.fromElements(1, 10);
+
+ IterativeDataStream<Integer> iter1 = source.iterate();
+ SingleOutputStreamOperator<Integer, ?> map1 = iter1.map(NoOpIntMap);
+ iter1.closeWith(map1).print();
+ }
+
+ @Test
+ public void testDoubleClosing() throws Exception {
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // introduce dummy mapper to get to correct parallelism
+ DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
+
IterativeDataStream<Integer> iter1 = source.iterate();
- IterativeDataStream<Integer> iter2 = source.iterate();
iter1.closeWith(iter1.map(NoOpIntMap));
- // Check for double closing
- try {
- iter1.closeWith(iter1.map(NoOpIntMap));
- fail();
- } catch (Exception e) {
- }
+ iter1.closeWith(iter1.map(NoOpIntMap));
+ }
- // Check for closing iteration without head
- try {
- iter2.closeWith(iter1.map(NoOpIntMap));
- fail();
- } catch (Exception e) {
- }
- iter2.map(NoOpIntMap);
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDifferingParallelism() throws Exception {
- // Check for executing with empty iteration
- try {
- env.execute();
- fail();
- } catch (Exception e) {
- }
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // introduce dummy mapper to get to correct parallelism
+ DataStream<Integer> source = env.fromElements(1, 10)
+ .map(NoOpIntMap);
+
+ IterativeDataStream<Integer> iter1 = source.iterate();
+
+
+ iter1.closeWith(iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2));
+
+ }
+
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testCoDifferingParallelism() throws Exception {
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // introduce dummy mapper to get to correct parallelism
+ DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
+
+ ConnectedIterativeDataStream<Integer, Integer> coIter = source.iterate().withFeedbackType(
+ Integer.class);
+
+
+ coIter.closeWith(coIter.map(NoOpIntCoMap).setParallelism(DEFAULT_PARALLELISM / 2));
+
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testClosingFromOutOfLoop() throws Exception {
+
+ // this test verifies that we cannot close an iteration with a DataStream that does not
+ // have the iteration in its predecessors
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // introduce dummy mapper to get to correct parallelism
+ DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
+
+ IterativeDataStream<Integer> iter1 = source.iterate();
+ IterativeDataStream<Integer> iter2 = source.iterate();
+
+
+ iter2.closeWith(iter1.map(NoOpIntMap));
+
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testCoIterClosingFromOutOfLoop() throws Exception {
+
+ // this test verifies that we cannot close an iteration with a DataStream that does not
+ // have the iteration in its predecessors
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // introduce dummy mapper to get to correct parallelism
+ DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
+
+ IterativeDataStream<Integer> iter1 = source.iterate();
+ ConnectedIterativeDataStream<Integer, Integer> coIter = source.iterate().withFeedbackType(
+ Integer.class);
+
+
+ coIter.closeWith(iter1.map(NoOpIntMap));
+
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testExecutionWithEmptyIteration() throws Exception {
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
+
+ IterativeDataStream<Integer> iter1 = source.iterate();
+
+ iter1.map(NoOpIntMap).print();
+
+ env.execute();
}
@Test
public void testImmutabilityWithCoiteration() {
- StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
- DataStream<Integer> source = env.fromElements(1, 10);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap); // for rebalance
IterativeDataStream<Integer> iter1 = source.iterate();
// Calling withFeedbackType should create a new iteration
ConnectedIterativeDataStream<Integer, String> iter2 = iter1.withFeedbackType(String.class);
- iter1.closeWith(iter1.map(NoOpIntMap));
- iter2.closeWith(iter2.map(NoOpCoMap));
+ iter1.closeWith(iter1.map(NoOpIntMap)).print();
+ iter2.closeWith(iter2.map(NoOpCoMap)).print();
StreamGraph graph = env.getStreamGraph();
- graph.getJobGraph();
+ assertEquals(2, graph.getIterationSourceSinkPairs().size());
- assertEquals(2, graph.getStreamLoops().size());
- for (StreamLoop loop : graph.getStreamLoops()) {
- assertEquals(loop.getHeads(), loop.getTails());
- List<Tuple2<StreamNode, StreamNode>> sourceSinkPairs = loop.getSourceSinkPairs();
- assertEquals(1, sourceSinkPairs.size());
+ for (Tuple2<StreamNode, StreamNode> sourceSinkPair: graph.getIterationSourceSinkPairs()) {
+ assertEquals(sourceSinkPair.f0.getOutEdges().get(0).getTargetVertex(), sourceSinkPair.f1.getInEdges().get(0).getSourceVertex());
}
}
@Test
public void testmultipleHeadsTailsSimple() {
- StreamExecutionEnvironment env = new TestStreamEnvironment(4, MEMORYSIZE);
- DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5).shuffle();
- DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- IterativeDataStream<Integer> iter1 = source1.union(source2).iterate();
+ DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5)
+ .shuffle()
+ .map(NoOpIntMap).name("ParallelizeMapShuffle");
+ DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5)
+ .map(NoOpIntMap).name("ParallelizeMapRebalance");
- DataStream<Integer> head1 = iter1.map(NoOpIntMap);
- DataStream<Integer> head2 = iter1.map(NoOpIntMap).setParallelism(2);
- DataStream<Integer> head3 = iter1.map(NoOpIntMap).setParallelism(2)
- .addSink(new NoOpSink<Integer>());
- DataStream<Integer> head4 = iter1.map(NoOpIntMap).addSink(new NoOpSink<Integer>());
+ IterativeDataStream<Integer> iter1 = source1.union(source2).iterate();
- SplitDataStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5).split(
- new OutputSelector<Integer>() {
+ DataStream<Integer> head1 = iter1.map(NoOpIntMap).name("IterRebalanceMap").setParallelism(DEFAULT_PARALLELISM / 2);
+ DataStream<Integer> head2 = iter1.map(NoOpIntMap).name("IterForwardMap");
+ DataStreamSink<Integer> head3 = iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2).addSink(new ReceiveCheckNoOpSink<Integer>());
+ DataStreamSink<Integer> head4 = iter1.map(NoOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>());
- @Override
- public Iterable<String> select(Integer value) {
- return value % 2 == 0 ? Arrays.asList("even") : Arrays.asList("odd");
- }
- });
+ SplitDataStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5)
+ .map(NoOpIntMap).name("EvenOddSourceMap")
+ .split(new EvenOddOutputSelector());
iter1.closeWith(source3.select("even").union(
- head1.map(NoOpIntMap).broadcast().setParallelism(1), head2.shuffle()));
+ head1.rebalance().map(NoOpIntMap).broadcast(), head2.shuffle()));
StreamGraph graph = env.getStreamGraph();
JobGraph jg = graph.getJobGraph();
- assertEquals(1, graph.getStreamLoops().size());
- StreamLoop loop = new ArrayList<StreamLoop>(graph.getStreamLoops()).get(0);
+ assertEquals(1, graph.getIterationSourceSinkPairs().size());
- assertEquals(4, loop.getHeads().size());
- assertEquals(3, loop.getTails().size());
+ Tuple2<StreamNode, StreamNode> sourceSinkPair = graph.getIterationSourceSinkPairs().iterator().next();
+ StreamNode itSource = sourceSinkPair.f0;
+ StreamNode itSink = sourceSinkPair.f1;
- assertEquals(1, loop.getSourceSinkPairs().size());
- Tuple2<StreamNode, StreamNode> pair = loop.getSourceSinkPairs().get(0);
+ assertEquals(4, itSource.getOutEdges().size());
+ assertEquals(3, itSink.getInEdges().size());
- assertEquals(pair.f0.getParallelism(), pair.f1.getParallelism());
- assertEquals(4, pair.f0.getOutEdges().size());
- assertEquals(3, pair.f1.getInEdges().size());
+ assertEquals(itSource.getParallelism(), itSink.getParallelism());
- for (StreamEdge edge : pair.f0.getOutEdges()) {
- assertTrue(edge.getPartitioner() instanceof ShufflePartitioner);
- }
- for (StreamEdge edge : pair.f1.getInEdges()) {
- assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
+ for (StreamEdge edge : itSource.getOutEdges()) {
+ if (edge.getTargetVertex().getOperatorName().equals("IterRebalanceMap")) {
+ assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
+ } else if (edge.getTargetVertex().getOperatorName().equals("IterForwardMap")) {
+ assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
+ }
}
+ for (StreamEdge edge : itSink.getInEdges()) {
+ if (graph.getStreamNode(edge.getSourceId()).getOperatorName().equals("ParallelizeMapShuffle")) {
+ assertTrue(edge.getPartitioner() instanceof ShufflePartitioner);
+ }
- assertTrue(loop.getTailSelectedNames().contains(Arrays.asList("even")));
+ if (graph.getStreamNode(edge.getSourceId()).getOperatorName().equals("ParallelizeMapForward")) {
+ assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
+ }
+
+ if (graph.getStreamNode(edge.getSourceId()).getOperatorName().equals("EvenOddSourceMap")) {
+ assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
+ assertTrue(edge.getSelectedNames().contains("even"));
+ }
+ }
// Test co-location
@@ -193,142 +277,106 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
@Test
public void testmultipleHeadsTailsWithTailPartitioning() {
- StreamExecutionEnvironment env = new TestStreamEnvironment(4, MEMORYSIZE);
- DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5).shuffle();
- DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- IterativeDataStream<Integer> iter1 = source1.union(source2).iterate();
+ DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5)
+ .shuffle()
+ .map(NoOpIntMap);
- DataStream<Integer> head1 = iter1.map(NoOpIntMap);
- DataStream<Integer> head2 = iter1.map(NoOpIntMap).setParallelism(2).name("shuffle");
- DataStream<Integer> head3 = iter1.map(NoOpIntMap).setParallelism(2)
- .addSink(new NoOpSink<Integer>());
- DataStream<Integer> head4 = iter1.map(NoOpIntMap).addSink(new NoOpSink<Integer>());
+ DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5)
+ .map(NoOpIntMap);
- SplitDataStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5).name("split")
- .split(new OutputSelector<Integer>() {
+ IterativeDataStream<Integer> iter1 = source1.union(source2).iterate();
- @Override
- public Iterable<String> select(Integer value) {
- return value % 2 == 0 ? Arrays.asList("even") : Arrays.asList("odd");
- }
- });
+ DataStream<Integer> head1 = iter1.map(NoOpIntMap).name("map1");
+ DataStream<Integer> head2 = iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2).rebalance().name(
+ "shuffle");
+ DataStreamSink<Integer> head3 = iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2)
+ .addSink(new ReceiveCheckNoOpSink<Integer>());
+ DataStreamSink<Integer> head4 = iter1.map(NoOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>());
+
+ SplitDataStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5)
+ .map(NoOpIntMap)
+ .name("split")
+ .split(new EvenOddOutputSelector());
iter1.closeWith(
source3.select("even").union(
- head1.map(NoOpIntMap).broadcast().setParallelism(1).name("bc"),
- head2.shuffle()), true);
+ head1.map(NoOpIntMap).broadcast().name("bc"),
+ head2.map(NoOpIntMap).shuffle()));
StreamGraph graph = env.getStreamGraph();
JobGraph jg = graph.getJobGraph();
- assertEquals(1, graph.getStreamLoops().size());
+ assertEquals(1, graph.getIterationSourceSinkPairs().size());
- StreamLoop loop = new ArrayList<StreamLoop>(graph.getStreamLoops()).get(0);
+ Tuple2<StreamNode, StreamNode> sourceSinkPair = graph.getIterationSourceSinkPairs().iterator().next();
+ StreamNode itSource = sourceSinkPair.f0;
+ StreamNode itSink = sourceSinkPair.f1;
- assertEquals(4, loop.getHeads().size());
- assertEquals(3, loop.getTails().size());
+ assertEquals(4, itSource.getOutEdges().size());
+ assertEquals(3, itSink.getInEdges().size());
- assertEquals(2, loop.getSourceSinkPairs().size());
- List<Tuple2<StreamNode, StreamNode>> pairs = loop.getSourceSinkPairs();
- Tuple2<StreamNode, StreamNode> pair1 = pairs.get(0).f0.getParallelism() == 2 ? pairs.get(0)
- : pairs.get(1);
- Tuple2<StreamNode, StreamNode> pair2 = pairs.get(0).f0.getParallelism() == 4 ? pairs.get(0)
- : pairs.get(1);
- assertEquals(pair1.f0.getParallelism(), pair1.f1.getParallelism());
- assertEquals(2, pair1.f0.getParallelism());
- assertEquals(2, pair1.f0.getOutEdges().size());
- assertEquals(3, pair1.f1.getInEdges().size());
+ assertEquals(itSource.getParallelism(), itSink.getParallelism());
- for (StreamEdge edge : pair1.f0.getOutEdges()) {
- assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
- assertEquals(2, edge.getTargetVertex().getParallelism());
- }
- for (StreamEdge edge : pair1.f1.getInEdges()) {
- String tailName = edge.getSourceVertex().getOperatorName();
- if (tailName.equals("split")) {
+ for (StreamEdge edge : itSource.getOutEdges()) {
+ if (edge.getTargetVertex().getOperatorName().equals("map1")) {
+ assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
+ assertEquals(4, edge.getTargetVertex().getParallelism());
+ } else if (edge.getTargetVertex().getOperatorName().equals("shuffle")) {
assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
- } else if (tailName.equals("bc")) {
- assertTrue(edge.getPartitioner() instanceof BroadcastPartitioner);
- } else if (tailName.equals("shuffle")) {
- assertTrue(edge.getPartitioner() instanceof ShufflePartitioner);
+ assertEquals(2, edge.getTargetVertex().getParallelism());
}
-
}
-
- assertEquals(pair2.f0.getParallelism(), pair2.f1.getParallelism());
- assertEquals(4, pair2.f0.getParallelism());
- assertEquals(2, pair2.f0.getOutEdges().size());
- assertEquals(3, pair2.f1.getInEdges().size());
-
- for (StreamEdge edge : pair2.f0.getOutEdges()) {
- assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
- assertEquals(4, edge.getTargetVertex().getParallelism());
- }
- for (StreamEdge edge : pair2.f1.getInEdges()) {
+ for (StreamEdge edge : itSink.getInEdges()) {
String tailName = edge.getSourceVertex().getOperatorName();
if (tailName.equals("split")) {
- assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
+ assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
+ assertTrue(edge.getSelectedNames().contains("even"));
} else if (tailName.equals("bc")) {
assertTrue(edge.getPartitioner() instanceof BroadcastPartitioner);
} else if (tailName.equals("shuffle")) {
assertTrue(edge.getPartitioner() instanceof ShufflePartitioner);
}
-
}
- assertTrue(loop.getTailSelectedNames().contains(Arrays.asList("even")));
-
// Test co-location
JobVertex itSource1 = null;
- JobVertex itSource2 = null;
JobVertex itSink1 = null;
- JobVertex itSink2 = null;
for (JobVertex vertex : jg.getVertices()) {
if (vertex.getName().contains("IterationSource")) {
- if (vertex.getName().contains("_0")) {
- itSource1 = vertex;
- } else if (vertex.getName().contains("_1")) {
- itSource2 = vertex;
- }
+ itSource1 = vertex;
} else if (vertex.getName().contains("IterationSink")) {
- if (vertex.getName().contains("_0")) {
- itSink1 = vertex;
- } else if (vertex.getName().contains("_1")) {
- itSink2 = vertex;
- }
+ itSink1 = vertex;
}
}
assertTrue(itSource1.getCoLocationGroup() != null);
- assertTrue(itSource2.getCoLocationGroup() != null);
+ assertTrue(itSink1.getCoLocationGroup() != null);
assertEquals(itSource1.getCoLocationGroup(), itSink1.getCoLocationGroup());
- assertEquals(itSource2.getCoLocationGroup(), itSink2.getCoLocationGroup());
- assertNotEquals(itSource1.getCoLocationGroup(), itSource2.getCoLocationGroup());
}
@SuppressWarnings("rawtypes")
@Test
public void testSimpleIteration() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
- iterated = new boolean[PARALLELISM];
+ iterated = new boolean[DEFAULT_PARALLELISM];
- DataStream<Boolean> source = env
- .fromCollection(Collections.nCopies(PARALLELISM * 2, false));
+ DataStream<Boolean> source = env.fromCollection(Collections.nCopies(DEFAULT_PARALLELISM * 2, false))
+ .map(NoOpBoolMap).name("ParallelizeMap");
IterativeDataStream<Boolean> iteration = source.iterate(3000);
DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).map(NoOpBoolMap);
- iteration.map(NoOpBoolMap).addSink(new NoOpSink());
+ iteration.map(NoOpBoolMap).addSink(new ReceiveCheckNoOpSink());
- iteration.closeWith(increment).addSink(new NoOpSink());
+ iteration.closeWith(increment).addSink(new ReceiveCheckNoOpSink());
env.execute();
@@ -343,7 +391,13 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
- ConnectedIterativeDataStream<Integer, String> coIt = env.fromElements(0, 0).iterate(2000)
+ DataStream<String> otherSource = env.fromElements("1000", "2000")
+ .map(NoOpStrMap).name("ParallelizeMap");
+
+
+ ConnectedIterativeDataStream<Integer, String> coIt = env.fromElements(0, 0)
+ .map(NoOpIntMap).name("ParallelizeMap")
+ .iterate(2000)
.withFeedbackType("String");
try {
@@ -392,25 +446,24 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
public String map2(String value) throws Exception {
return value;
}
- }).setParallelism(1).addSink(new NoOpSink<String>());
+ }).addSink(new ReceiveCheckNoOpSink<String>());
- coIt.closeWith(head.broadcast().union(env.fromElements("1000", "2000").rebalance()));
+ coIt.closeWith(head.broadcast().union(otherSource));
head.addSink(new TestSink()).setParallelism(1);
+ assertEquals(1, env.getStreamGraph().getIterationSourceSinkPairs().size());
+
env.execute();
Collections.sort(TestSink.collected);
assertEquals(Arrays.asList("1", "1", "2", "2", "2", "2"), TestSink.collected);
- assertEquals(2, new ArrayList<StreamLoop>(env.getStreamGraph().getStreamLoops()).get(0)
- .getSourceSinkPairs().size());
-
}
@Test
public void testGroupByFeedback() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(3);
+ env.setParallelism(DEFAULT_PARALLELISM - 1);
KeySelector<Integer, Integer> key = new KeySelector<Integer, Integer>() {
@@ -420,7 +473,8 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
}
};
- DataStream<Integer> source = env.fromElements(1, 2, 3);
+ DataStream<Integer> source = env.fromElements(1, 2, 3)
+ .map(NoOpIntMap).name("ParallelizeMap");
IterativeDataStream<Integer> it = source.groupBy(key).iterate(3000);
@@ -448,8 +502,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
}
});
- it.closeWith(head.groupBy(key).union(head.map(NoOpIntMap).setParallelism(2).groupBy(key)),
- true).addSink(new NoOpSink<Integer>());
+ it.closeWith(head.groupBy(key).union(head.map(NoOpIntMap).groupBy(key))).addSink(new ReceiveCheckNoOpSink<Integer>());
env.execute();
}
@@ -457,15 +510,17 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
@SuppressWarnings("deprecation")
@Test
public void testWithCheckPointing() throws Exception {
- StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
env.enableCheckpointing();
- DataStream<Boolean> source = env
- .fromCollection(Collections.nCopies(PARALLELISM * 2, false));
+ DataStream<Boolean> source = env .fromCollection(Collections.nCopies(DEFAULT_PARALLELISM * 2, false))
+ .map(NoOpBoolMap).name("ParallelizeMap");
+
IterativeDataStream<Boolean> iteration = source.iterate(3000);
- iteration.closeWith(iteration.flatMap(new IterationHead()));
+ iteration.closeWith(iteration.flatMap(new IterationHead())).addSink(new ReceiveCheckNoOpSink<Boolean>());
try {
env.execute();
@@ -503,22 +558,6 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
}
}
- public static final class NoOpSink<T> extends RichSinkFunction<T> {
- private List<T> received;
-
- public void invoke(T tuple) {
- received.add(tuple);
- }
-
- public void open(Configuration conf) {
- received = new ArrayList<T>();
- }
-
- public void close() {
- assertTrue(received.size() > 0);
- }
- }
-
public static CoMapFunction<Integer, String, String> NoOpCoMap = new CoMapFunction<Integer, String, String>() {
public String map1(Integer value) throws Exception {
@@ -530,9 +569,23 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
}
};
- public static MapFunction<Integer, Integer> NoOpIntMap = new MapFunction<Integer, Integer>() {
+ public static MapFunction<Integer, Integer> NoOpIntMap = new NoOpIntMap();
+
+ public static MapFunction<String, String> NoOpStrMap = new MapFunction<String, String>() {
+
+ public String map(String value) throws Exception {
+ return value;
+ }
+
+ };
+
+ public static CoMapFunction<Integer, Integer, Integer> NoOpIntCoMap = new CoMapFunction<Integer, Integer, Integer>() {
+
+ public Integer map1(Integer value) throws Exception {
+ return value;
+ }
- public Integer map(Integer value) throws Exception {
+ public Integer map2(Integer value) throws Exception {
return value;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
index fc78d27..8525d37 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
@@ -26,13 +26,12 @@ import java.util.List;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.streaming.util.TestListResultSink;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.junit.Test;
-public class OutputSplitterTest {
-
- private static final long MEMORYSIZE = 32;
+public class OutputSplitterTest extends StreamingMultipleProgramsTestBase {
private static ArrayList<Integer> expectedSplitterResult = new ArrayList<Integer>();
@@ -42,7 +41,8 @@ public class OutputSplitterTest {
TestListResultSink<Integer> splitterResultSink1 = new TestListResultSink<Integer>();
TestListResultSink<Integer> splitterResultSink2 = new TestListResultSink<Integer>();
- StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
env.setBufferTimeout(1);
DataStream<Integer> d1 = env.fromElements(0, 2, 4, 6, 8);
@@ -95,7 +95,8 @@ public class OutputSplitterTest {
TestListResultSink<Integer> splitterResultSink1 = new TestListResultSink<Integer>();
TestListResultSink<Integer> splitterResultSink2 = new TestListResultSink<Integer>();
- StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
env.setBufferTimeout(1);
DataStream<Integer> ds = env.fromElements(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
index c858834..987a8fb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
@@ -25,14 +25,18 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.NoOpIntMap;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.streaming.util.TestListResultSink;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.junit.Test;
@@ -40,10 +44,33 @@ import org.junit.Test;
/**
* IT case that tests the different stream partitioning schemes.
*/
-public class PartitionerTest {
+public class PartitionerTest extends StreamingMultipleProgramsTestBase {
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testForwardFailsLowToHighParallelism() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Integer> src = env.fromElements(1, 2, 3);
+
+ // this doesn't work because it goes from 1 to 3
+ src.forward().map(new NoOpIntMap());
+
+ env.execute();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testForwardFailsHightToLowParallelism() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // this does a rebalance that works
+ DataStream<Integer> src = env.fromElements(1, 2, 3).map(new NoOpIntMap());
+
+ // this doesn't work because it goes from 3 to 1
+ src.forward().map(new NoOpIntMap()).setParallelism(1);
+
+ env.execute();
+ }
- public static final int PARALLELISM = 3;
- public static final int MEMORY_SIZE = 32;
@Test
public void partitionerTest() {
@@ -62,7 +89,9 @@ public class PartitionerTest {
new TestListResultSink<Tuple2<Integer, String>>();
- StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORY_SIZE);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(3);
+
DataStream<Tuple1<String>> src = env.fromElements(
new Tuple1<String>("a"),
new Tuple1<String>("b"),
@@ -98,12 +127,21 @@ public class PartitionerTest {
// partition broadcast
src.broadcast().map(new SubtaskIndexAssigner()).addSink(broadcastPartitionResultSink);
- // partition forward
- src.map(new SubtaskIndexAssigner()).addSink(forwardPartitionResultSink);
-
// partition rebalance
src.rebalance().map(new SubtaskIndexAssigner()).addSink(rebalancePartitionResultSink);
+ // partition forward
+ src.map(new MapFunction<Tuple1<String>, Tuple1<String>>() {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public Tuple1<String> map(Tuple1<String> value) throws Exception {
+ return value;
+ }
+ })
+ .forward()
+ .map(new SubtaskIndexAssigner())
+ .addSink(forwardPartitionResultSink);
+
// partition global
src.global().map(new SubtaskIndexAssigner()).addSink(globalPartitionResultSink);
@@ -209,8 +247,8 @@ public class PartitionerTest {
new HashSet<Tuple2<Integer, String>>(globalPartitionResult));
}
- private static class SubtaskIndexAssigner
- extends RichMapFunction<Tuple1<String>, Tuple2<Integer, String>> {
+ private static class SubtaskIndexAssigner extends RichMapFunction<Tuple1<String>, Tuple2<Integer, String>> {
+ private static final long serialVersionUID = 1L;
private int indexOfSubtask;
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
index e2fe599..606259e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -37,24 +37,23 @@ import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.util.NoOpSink;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.SplittableIterator;
import org.junit.Test;
-public class StreamExecutionEnvironmentTest {
-
- private static final long MEMORYSIZE = 32;
- private static int PARALLELISM = 4;
+public class StreamExecutionEnvironmentTest extends StreamingMultipleProgramsTestBase {
@Test
@SuppressWarnings("unchecked")
public void testFromCollectionParallelism() {
try {
TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO;
- StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> dataStream1 = env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo);
-
+
try {
dataStream1.setParallelism(4);
fail("should throw an exception");
@@ -62,15 +61,20 @@ public class StreamExecutionEnvironmentTest {
catch (IllegalArgumentException e) {
// expected
}
+
+ dataStream1.addSink(new NoOpSink<Integer>());
- env.fromParallelCollection(new DummySplittableIterator<Integer>(), typeInfo).setParallelism(4);
-
+ DataStreamSource<Integer> dataStream2 = env.fromParallelCollection(new DummySplittableIterator<Integer>(),
+ typeInfo).setParallelism(4);
+
+ dataStream2.addSink(new NoOpSink<Integer>());
+
String plan = env.getExecutionPlan();
-
- assertTrue("Parallelism for dataStream1 is not right.",
- plan.contains("\"contents\":\"Collection Source\",\"parallelism\":1"));
- assertTrue("Parallelism for dataStream2 is not right.",
- plan.contains("\"contents\":\"Parallel Collection Source\",\"parallelism\":4"));
+
+ assertEquals("Parallelism of collection source must be 1.", 1, env.getStreamGraph().getStreamNode(dataStream1.getId()).getParallelism());
+ assertEquals("Parallelism of parallel collection source must be 4.",
+ 4,
+ env.getStreamGraph().getStreamNode(dataStream2.getId()).getParallelism());
}
catch (Exception e) {
e.printStackTrace();
@@ -80,7 +84,7 @@ public class StreamExecutionEnvironmentTest {
@Test
public void testSources() {
- StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
private static final long serialVersionUID = 1L;
@@ -94,6 +98,7 @@ public class StreamExecutionEnvironmentTest {
}
};
DataStreamSource<Integer> src1 = env.addSource(srcFun);
+ src1.addSink(new NoOpSink<Integer>());
assertEquals(srcFun, getFunctionFromDataSource(src1));
List<Long> list = Arrays.asList(0L, 1L, 2L);
@@ -120,6 +125,7 @@ public class StreamExecutionEnvironmentTest {
}
private static <T> SourceFunction<T> getFunctionFromDataSource(DataStreamSource<T> dataStreamSource) {
+ dataStreamSource.addSink(new NoOpSink<T>());
AbstractUdfStreamOperator<?, ?> operator =
(AbstractUdfStreamOperator<?, ?>) getOperatorFromDataStream(dataStreamSource);
return (SourceFunction<T>) operator.getUserFunction();
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
index 7d08709..0989128 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
@@ -33,11 +33,12 @@ import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
import org.apache.flink.streaming.api.functions.co.CoWindowFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.util.Collector;
import org.junit.Test;
@SuppressWarnings("serial")
-public class TypeFillTest {
+public class TypeFillTest extends StreamingMultipleProgramsTestBase {
@Test
public void test() {
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
index cccec40..508f1a2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -30,15 +30,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.streaming.util.TestListResultSink;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.junit.Test;
-public class WindowCrossJoinTest implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private static final long MEMORYSIZE = 32;
+public class WindowCrossJoinTest extends StreamingMultipleProgramsTestBase {
private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> crossExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
@@ -55,7 +52,8 @@ public class WindowCrossJoinTest implements Serializable {
@Test
public void test() throws Exception {
- StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
env.setBufferTimeout(1);
TestListResultSink<Tuple2<Tuple2<Integer, String>, Integer>> joinResultSink =
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index fc3e36f..b7df2ec 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -30,11 +30,11 @@ import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.streaming.util.TestListResultSink;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.junit.Test;
-public class DirectedOutputTest {
+public class DirectedOutputTest extends StreamingMultipleProgramsTestBase {
private static final String TEN = "ten";
private static final String ODD = "odd";
@@ -94,7 +94,8 @@ public class DirectedOutputTest {
@Test
public void outputSelectorTest() throws Exception {
- StreamExecutionEnvironment env = new TestStreamEnvironment(3, 32);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(3);
TestListResultSink<Long> evenSink = new TestListResultSink<Long>();
TestListResultSink<Long> oddAndTenSink = new TestListResultSink<Long>();
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 46e4327..f2c253c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.complex;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -44,7 +43,6 @@ import org.apache.flink.streaming.api.windowing.helper.Delta;
import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.After;
import org.junit.Before;
@@ -131,7 +129,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
}
})
-
.setParallelism(1).filter(new FilterFunction
<Tuple2<Long, Tuple2<String, Long>>>() {
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
index 767eaa4..39a13b3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
@@ -25,14 +25,15 @@ import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.junit.Test;
-public class SlotAllocationTest {
+public class SlotAllocationTest extends StreamingMultipleProgramsTestBase{
@SuppressWarnings("serial")
@Test
public void test() {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(8);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FilterFunction<Long> dummyFilter = new FilterFunction<Long>() {
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
new file mode 100644
index 0000000..fb2ef56
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
+import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
+import org.apache.flink.streaming.util.EvenOddOutputSelector;
+import org.apache.flink.streaming.util.NoOpIntMap;
+import org.apache.flink.streaming.util.NoOpSink;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link StreamGraphGenerator}. This only tests correct translation of split/select,
+ * union, partitioning since the other translation routines are tested already in operation
+ * specific tests, for example in {@link org.apache.flink.streaming.api.IterateTest} for
+ * iterations.
+ */
+public class StreamGraphGeneratorTest extends StreamingMultipleProgramsTestBase {
+
+ /**
+ * This tests whether virtual Transformations behave correctly.
+ *
+ * <p>
+ * Verifies that partitioning, output selector, selected names are correctly set in the
+ * StreamGraph when they are intermixed.
+ */
+ @Test
+ public void testVirtualTransformations() throws Exception {
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Integer> source = env.fromElements(1, 10);
+
+ DataStream<Integer> rebalanceMap = source.rebalance().map(new NoOpIntMap());
+
+ // verify that only the partitioning that was set last is used
+ DataStream<Integer> broadcastMap = rebalanceMap
+ .forward()
+ .global()
+ .broadcast()
+ .map(new NoOpIntMap());
+
+ broadcastMap.addSink(new NoOpSink<Integer>());
+
+ // verify that partitioning is preserved across union and split/select
+ EvenOddOutputSelector selector1 = new EvenOddOutputSelector();
+ EvenOddOutputSelector selector2 = new EvenOddOutputSelector();
+ EvenOddOutputSelector selector3 = new EvenOddOutputSelector();
+
+ DataStream<Integer> map1Operator = rebalanceMap
+ .map(new NoOpIntMap());
+
+ DataStream<Integer> map1 = map1Operator
+ .broadcast()
+ .split(selector1)
+ .select("even");
+
+ DataStream<Integer> map2Operator = rebalanceMap
+ .map(new NoOpIntMap());
+
+ DataStream<Integer> map2 = map2Operator
+ .split(selector2)
+ .select("odd")
+ .global();
+
+ DataStream<Integer> map3Operator = rebalanceMap
+ .map(new NoOpIntMap());
+
+ DataStream<Integer> map3 = map3Operator
+ .global()
+ .split(selector3)
+ .select("even")
+ .shuffle();
+
+
+ SingleOutputStreamOperator<Integer, ?> unionedMap = map1.union(map2).union(map3)
+ .map(new NoOpIntMap());
+
+ unionedMap.addSink(new NoOpSink<Integer>());
+
+ StreamGraph graph = env.getStreamGraph();
+
+ // rebalanceMap
+ assertTrue(graph.getStreamNode(rebalanceMap.getId()).getInEdges().get(0).getPartitioner() instanceof RebalancePartitioner);
+
+ // verify that only last partitioning takes precedence
+ assertTrue(graph.getStreamNode(broadcastMap.getId()).getInEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
+ assertEquals(rebalanceMap.getId(), graph.getStreamNode(broadcastMap.getId()).getInEdges().get(0).getSourceVertex().getId());
+
+ // verify that partitioning in unions is preserved and that it works across split/select
+ assertTrue(graph.getStreamNode(map1Operator.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
+ assertTrue(graph.getStreamNode(map1Operator.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("even"));
+ assertTrue(graph.getStreamNode(map1Operator.getId()).getOutputSelectors().contains(selector1));
+
+ assertTrue(graph.getStreamNode(map2Operator.getId()).getOutEdges().get(0).getPartitioner() instanceof GlobalPartitioner);
+ assertTrue(graph.getStreamNode(map2Operator.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("odd"));
+ assertTrue(graph.getStreamNode(map2Operator.getId()).getOutputSelectors().contains(selector2));
+
+ assertTrue(graph.getStreamNode(map3Operator.getId()).getOutEdges().get(0).getPartitioner() instanceof ShufflePartitioner);
+ assertTrue(graph.getStreamNode(map3Operator.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("even"));
+ assertTrue(graph.getStreamNode(map3Operator.getId()).getOutputSelectors().contains(selector3));
+ }
+
+ /**
+ * This tests whether virtual Transformations behave correctly.
+ *
+ * Checks whether output selector, partitioning works correctly when applied on a union.
+ */
+ @Test
+ public void testVirtualTransformations2() throws Exception {
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Integer> source = env.fromElements(1, 10);
+
+ DataStream<Integer> rebalanceMap = source.rebalance().map(new NoOpIntMap());
+
+ DataStream<Integer> map1 = rebalanceMap
+ .map(new NoOpIntMap());
+
+ DataStream<Integer> map2 = rebalanceMap
+ .map(new NoOpIntMap());
+
+ DataStream<Integer> map3 = rebalanceMap
+ .map(new NoOpIntMap());
+
+ EvenOddOutputSelector selector = new EvenOddOutputSelector();
+
+ SingleOutputStreamOperator<Integer, ?> unionedMap = map1.union(map2).union(map3)
+ .broadcast()
+ .split(selector)
+ .select("foo")
+ .map(new NoOpIntMap());
+
+ unionedMap.addSink(new NoOpSink<Integer>());
+
+ StreamGraph graph = env.getStreamGraph();
+
+ // verify that the properties are correctly set on all input operators
+ assertTrue(graph.getStreamNode(map1.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
+ assertTrue(graph.getStreamNode(map1.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("foo"));
+ assertTrue(graph.getStreamNode(map1.getId()).getOutputSelectors().contains(selector));
+
+ assertTrue(graph.getStreamNode(map2.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
+ assertTrue(graph.getStreamNode(map2.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("foo"));
+ assertTrue(graph.getStreamNode(map2.getId()).getOutputSelectors().contains(selector));
+
+ assertTrue(graph.getStreamNode(map3.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
+ assertTrue(graph.getStreamNode(map3.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("foo"));
+ assertTrue(graph.getStreamNode(map3.getId()).getOutputSelectors().contains(selector));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 4e7c963..e806428 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -21,6 +21,8 @@ import java.util.Random;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.InstantiationUtil;
import org.junit.Assert;
@@ -28,7 +30,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class StreamingJobGraphGeneratorTest {
+public class StreamingJobGraphGeneratorTest extends StreamingMultipleProgramsTestBase {
private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGeneratorTest.class);
@Test
@@ -36,8 +38,9 @@ public class StreamingJobGraphGeneratorTest {
final long seed = System.currentTimeMillis();
LOG.info("Test seed: {}", new Long(seed));
final Random r = new Random(seed);
-
- TestStreamEnvironment env = new TestStreamEnvironment(4, 32);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
StreamGraph streamingJob = new StreamGraph(env);
StreamingJobGraphGenerator compiler = new StreamingJobGraphGenerator(streamingJob);
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
index e8f0a03..bb9dad7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.junit.Test;
@@ -52,8 +53,7 @@ import org.junit.Test;
* <li>Watermarks are correctly forwarded</li>
* </ul>
*/
-public class StreamProjectTest implements Serializable {
- private static final long serialVersionUID = 1L;
+public class StreamProjectTest extends StreamingMultipleProgramsTestBase {
@Test
public void testProject() throws Exception {
@@ -95,7 +95,6 @@ public class StreamProjectTest implements Serializable {
// tests using projection from the API without explicitly specifying the types
- private static final long MEMORY_SIZE = 32;
private static HashSet<Tuple2<Long, Double>> expected = new HashSet<Tuple2<Long, Double>>();
private static HashSet<Tuple2<Long, Double>> actual = new HashSet<Tuple2<Long, Double>>();
@@ -106,7 +105,8 @@ public class StreamProjectTest implements Serializable {
expected.add(new Tuple2<Long, Double>(i, i.doubleValue()));
}
- StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORY_SIZE);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
env.generateSequence(1, 10).map(new MapFunction<Long, Tuple3<Long, Character, Double>>() {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
index b8e9619..4c644a9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
@@ -30,10 +30,12 @@ import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.NoOpSink;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
/**
@@ -169,21 +171,4 @@ public class CoStreamFlatMapTest implements Serializable {
}
}
-
- @SuppressWarnings("unchecked")
- @Test
- public void multipleInputTest() {
- LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
- DataStream<Integer> ds1 = env.fromElements(1, 3, 5);
- DataStream<Integer> ds2 = env.fromElements(2, 4).union(ds1);
-
- try {
- ds1.forward().union(ds2);
- fail();
- } catch (RuntimeException e) {
- // expected
- }
-
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
index 5986a30..512a0df 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
@@ -31,19 +31,17 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.streaming.util.TestListResultSink;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.Test;
-public class SelfConnectionTest implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private final int MEMORY_SIZE = 32;
+public class SelfConnectionTest extends StreamingMultipleProgramsTestBase {
private static List<String> expected;
@@ -51,20 +49,12 @@ public class SelfConnectionTest implements Serializable {
@Test
public void sameDataStreamTest() {
- StreamExecutionEnvironment env = new TestStreamEnvironment(3, MEMORY_SIZE);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(3);
TestListResultSink<String> resultSink = new TestListResultSink<String>();
- Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public long getTimestamp(Integer value) {
- return value;
- }
-
- };
+ Timestamp<Integer> timeStamp = new IntegerTimestamp();
KeySelector keySelector = new KeySelector<Integer, Integer>() {
@@ -79,7 +69,7 @@ public class SelfConnectionTest implements Serializable {
DataStream<Integer> src = env.fromElements(1, 3, 5);
@SuppressWarnings("unused")
- DataStream<Tuple2<Integer, Integer>> dataStream =
+ DataStreamSink<Tuple2<Integer, Integer>> dataStream =
src.join(src).onWindow(50L, timeStamp, timeStamp).where(keySelector).equalTo(keySelector)
.map(new MapFunction<Tuple2<Integer, Integer>, String>() {
@@ -107,8 +97,8 @@ public class SelfConnectionTest implements Serializable {
assertEquals(expected, result);
} catch (Exception e) {
- fail();
e.printStackTrace();
+ fail();
}
}
@@ -120,7 +110,8 @@ public class SelfConnectionTest implements Serializable {
TestListResultSink<String> resultSink = new TestListResultSink<String>();
- StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORY_SIZE);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
DataStream<Integer> src = env.fromElements(1, 3, 5);
@@ -175,7 +166,8 @@ public class SelfConnectionTest implements Serializable {
TestListResultSink<String> resultSink = new TestListResultSink<String>();
- StreamExecutionEnvironment env = new TestStreamEnvironment(3, MEMORY_SIZE);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(3);
DataStream<Integer> src = env.fromElements(1, 3, 5).disableChaining();
@@ -248,4 +240,15 @@ public class SelfConnectionTest implements Serializable {
assertEquals(expected, result);
}
+
+ private static class IntegerTimestamp implements Timestamp<Integer> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(Integer value) {
+ return value;
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
index 5e6ffa2..db09373 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
@@ -40,14 +40,12 @@ import org.apache.flink.streaming.api.windowing.helper.Count;
import org.apache.flink.streaming.api.windowing.helper.FullStream;
import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.Test;
-public class WindowingITCase implements Serializable {
-
- private static final long serialVersionUID = 1L;
- private static final Integer MEMORYSIZE = 32;
+public class WindowingITCase extends StreamingMultipleProgramsTestBase {
@SuppressWarnings("serial")
public static class ModKey implements KeySelector<Integer, Integer> {
@@ -98,17 +96,10 @@ public class WindowingITCase implements Serializable {
KeySelector<Integer, ?> key = new ModKey(2);
- Timestamp<Integer> ts = new Timestamp<Integer>() {
-
- private static final long serialVersionUID = 1L;
+ Timestamp<Integer> ts = new IntegerTimestamp();
- @Override
- public long getTimestamp(Integer value) {
- return value;
- }
- };
-
- StreamExecutionEnvironment env = new TestStreamEnvironment(2, MEMORYSIZE);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
env.disableOperatorChaining();
DataStream<Integer> source = env.fromCollection(inputs);
@@ -116,14 +107,18 @@ public class WindowingITCase implements Serializable {
source.window(Time.of(3, ts, 1)).every(Time.of(2, ts, 1)).sum(0).getDiscretizedStream()
.addSink(new TestSink1());
- source.window(Time.of(4, ts, 1)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap())
- .flatten().addSink(new TestSink2());
+ source.window(Time.of(4, ts, 1)).groupBy(new ModKey(2))
+ .mapWindow(new IdentityWindowMap())
+ .flatten()
+ .addSink(new TestSink2()).name("TESTSIUNK2");
source.groupBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
.addSink(new TestSink4());
source.groupBy(new ModKey(3)).window(Count.of(2)).groupBy(new ModKey(2))
- .mapWindow(new IdentityWindowMap()).flatten().addSink(new TestSink5());
+ .mapWindow(new IdentityWindowMap())
+ .flatten()
+ .addSink(new TestSink5());
source.window(Time.of(2, ts)).every(Time.of(3, ts)).min(0).getDiscretizedStream()
.addSink(new TestSink3());
@@ -131,11 +126,13 @@ public class WindowingITCase implements Serializable {
source.groupBy(key).window(Time.of(4, ts, 1)).max(0).getDiscretizedStream()
.addSink(new TestSink6());
- source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap()).flatten()
+ source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap())
+ .flatten()
.addSink(new TestSink7());
source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).groupBy(new ModKey(2)).sum(0)
- .getDiscretizedStream().addSink(new TestSink8());
+ .getDiscretizedStream()
+ .addSink(new TestSink8());
try {
source.window(FullStream.window()).every(Count.of(2)).getDiscretizedStream();
@@ -156,7 +153,8 @@ public class WindowingITCase implements Serializable {
source.every(Count.of(4)).sum(0).getDiscretizedStream().addSink(new TestSink11());
source.window(FullStream.window()).every(Count.of(4)).groupBy(key).sum(0)
- .getDiscretizedStream().addSink(new TestSink12());
+ .getDiscretizedStream()
+ .addSink(new TestSink12());
DataStream<Integer> source2 = env.addSource(new ParallelSourceFunction<Integer>() {
private static final long serialVersionUID = 1L;
@@ -202,12 +200,15 @@ public class WindowingITCase implements Serializable {
source3.window(Time.of(5, ts, 1)).groupBy(new ModKey(2)).sum(0).getDiscretizedStream()
.addSink(new TestSink10());
- source.map(new MapFunction<Integer, Integer>() {
- @Override
- public Integer map(Integer value) throws Exception {
- return value;
- }
- }).every(Time.of(5, ts, 1)).sum(0).getDiscretizedStream().addSink(new TestSink13());
+ source
+ .map(new MapFunction<Integer, Integer>() {
+ @Override
+ public Integer map(Integer value) throws Exception {
+ return value;
+ }
+ })
+ .every(Time.of(5, ts, 1)).sum(0).getDiscretizedStream()
+ .addSink(new TestSink13());
env.execute();
@@ -516,4 +517,13 @@ public class WindowingITCase implements Serializable {
}
+ private static class IntegerTimestamp implements Timestamp<Integer> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(Integer value) {
+ return value;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
index 68e2a75..7ac5616 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
@@ -40,9 +40,9 @@ public class CsvOutputFormatITCase extends StreamingProgramTestBase {
DataStream<String> text = env.fromElements(WordCountData.TEXT);
- DataStream<Tuple2<String, Integer>> counts =
- text.flatMap(new Tokenizer())
- .groupBy(0).sum(1);
+ DataStream<Tuple2<String, Integer>> counts = text
+ .flatMap(new Tokenizer())
+ .groupBy(0).sum(1);
counts.writeAsCsv(resultPath);
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
index 3c48b3f..6bbcea8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
@@ -38,9 +38,9 @@ public class TextOutputFormatITCase extends StreamingProgramTestBase {
DataStream<String> text = env.fromElements(WordCountData.TEXT);
- DataStream<Tuple2<String, Integer>> counts =
- text.flatMap(new CsvOutputFormatITCase.Tokenizer())
- .groupBy(0).sum(1);
+ DataStream<Tuple2<String, Integer>> counts = text
+ .flatMap(new CsvOutputFormatITCase.Tokenizer())
+ .groupBy(0).sum(1);
counts.writeAsText(resultPath);
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index 60db798..32b3455 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -51,6 +51,7 @@ import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.InstantiationUtil;
import org.junit.Test;
@@ -62,7 +63,7 @@ import com.google.common.collect.ImmutableMap;
* partitioned and non-partitioned user states. This test mimics the runtime
* behavior of stateful stream operators.
*/
-public class StatefulOperatorTest {
+public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase {
@Test
public void simpleStateTest() throws Exception {
@@ -104,7 +105,8 @@ public class StatefulOperatorTest {
@Test
public void apiTest() throws Exception {
- StreamExecutionEnvironment env = new TestStreamEnvironment(3, 32);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(3);
KeyedDataStream<Integer> keyedStream = env.fromCollection(Arrays.asList(0, 1, 2, 3, 4, 5, 6)).keyBy(new ModKey(4));
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
index f45125b..122aa8a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
@@ -36,10 +36,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.junit.Test;
-public class StreamVertexTest {
+public class StreamVertexTest extends StreamingMultipleProgramsTestBase {
private static Map<Integer, Integer> data = new HashMap<Integer, Integer>();
@@ -86,14 +87,12 @@ public class StreamVertexTest {
}
@SuppressWarnings("unused")
- private static final int PARALLELISM = 1;
private static final int SOURCE_PARALELISM = 1;
- private static final long MEMORYSIZE = 32;
@Test
public void wrongJobGraph() {
- LocalStreamEnvironment env = StreamExecutionEnvironment
- .createLocalEnvironment(SOURCE_PARALELISM);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(SOURCE_PARALELISM);
try {
env.fromCollection(null);
@@ -155,7 +154,8 @@ public class StreamVertexTest {
@Test
public void coTest() throws Exception {
- StreamExecutionEnvironment env = new TestStreamEnvironment(SOURCE_PARALELISM, MEMORYSIZE);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(SOURCE_PARALELISM);
DataStream<String> fromStringElements = env.fromElements("aa", "bb", "cc");
DataStream<Long> generatedSequence = env.generateSequence(0, 3);
@@ -171,7 +171,8 @@ public class StreamVertexTest {
@Test
public void runStream() throws Exception {
- StreamExecutionEnvironment env = new TestStreamEnvironment(SOURCE_PARALELISM, MEMORYSIZE);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(SOURCE_PARALELISM);
env.addSource(new MySource()).setParallelism(SOURCE_PARALELISM).map(new MyTask())
.addSink(new MySink());
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
index 89679ea..9d9d47b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
@@ -24,12 +24,13 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.junit.Test;
import static org.junit.Assert.*;
@SuppressWarnings("serial")
-public class TranslationTest {
+public class TranslationTest extends StreamingMultipleProgramsTestBase {
@Test
public void testCheckpointModeTranslation() {
@@ -66,7 +67,8 @@ public class TranslationTest {
env.generateSequence(1, 10000000)
.addSink(new SinkFunction<Long>() {
@Override
- public void invoke(Long value) {}
+ public void invoke(Long value) {
+ }
});
return env;
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
deleted file mode 100644
index 2643bba..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class DistributePartitionerTest {
-
- private RebalancePartitioner<Tuple> distributePartitioner;
- private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null);
- private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
- null);
-
- @Before
- public void setPartitioner() {
- distributePartitioner = new RebalancePartitioner<Tuple>(false);
- }
-
- @Test
- public void testSelectChannelsLength() {
- sd.setInstance(streamRecord);
- assertEquals(1, distributePartitioner.selectChannels(sd, 1).length);
- assertEquals(1, distributePartitioner.selectChannels(sd, 2).length);
- assertEquals(1, distributePartitioner.selectChannels(sd, 1024).length);
- }
-
- @Test
- public void testSelectChannelsInterval() {
- sd.setInstance(streamRecord);
- assertEquals(0, distributePartitioner.selectChannels(sd, 3)[0]);
- assertEquals(1, distributePartitioner.selectChannels(sd, 3)[0]);
- assertEquals(2, distributePartitioner.selectChannels(sd, 3)[0]);
- assertEquals(0, distributePartitioner.selectChannels(sd, 3)[0]);
- }
-}