You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:44 UTC
[28/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
deleted file mode 100644
index c23a4f4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ /dev/null
@@ -1,694 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.ConnectedStreams;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.KeyedStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
-import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
-import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
-import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.util.NoOpSink;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class DataStreamTest extends StreamingMultipleProgramsTestBase {
-
-
- /**
- * Tests {@link SingleOutputStreamOperator#name(String)} functionality.
- *
- * @throws Exception
- */
- @Test
- public void testNaming() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<Long> dataStream1 = env.generateSequence(0, 0).name("testSource1")
- .map(new MapFunction<Long, Long>() {
- @Override
- public Long map(Long value) throws Exception {
- return null;
- }
- }).name("testMap");
-
- DataStream<Long> dataStream2 = env.generateSequence(0, 0).name("testSource2")
- .map(new MapFunction<Long, Long>() {
- @Override
- public Long map(Long value) throws Exception {
- return null;
- }
- }).name("testMap");
-
- DataStreamSink<Long> connected = dataStream1.connect(dataStream2)
- .flatMap(new CoFlatMapFunction<Long, Long, Long>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap1(Long value, Collector<Long> out) throws Exception {
- }
-
- @Override
- public void flatMap2(Long value, Collector<Long> out) throws Exception {
- }
- }).name("testCoFlatMap")
- .windowAll(GlobalWindows.create())
- .trigger(PurgingTrigger.of(CountTrigger.of(10)))
- .fold(0L, new FoldFunction<Long, Long>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Long fold(Long accumulator, Long value) throws Exception {
- return null;
- }
- })
- .name("testWindowFold")
- .print();
-
- //test functionality through the operator names in the execution plan
- String plan = env.getExecutionPlan();
-
- assertTrue(plan.contains("testSource1"));
- assertTrue(plan.contains("testSource2"));
- assertTrue(plan.contains("testMap"));
- assertTrue(plan.contains("testMap"));
- assertTrue(plan.contains("testCoFlatMap"));
- assertTrue(plan.contains("testWindowFold"));
- }
-
- /**
- * Tests that {@link DataStream#keyBy} and {@link DataStream#partitionByHash} result in
- * different and correct topologies. Does the some for the {@link ConnectedStreams}.
- */
- @Test
- @SuppressWarnings("unchecked")
- public void testPartitioning() {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<Tuple2<Long, Long>> src1 = env.fromElements(new Tuple2<>(0L, 0L));
- DataStream<Tuple2<Long, Long>> src2 = env.fromElements(new Tuple2<>(0L, 0L));
- ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connected = src1.connect(src2);
-
- //Testing DataStream grouping
- DataStream<Tuple2<Long, Long>> group1 = src1.keyBy(0);
- DataStream<Tuple2<Long, Long>> group2 = src1.keyBy(1, 0);
- DataStream<Tuple2<Long, Long>> group3 = src1.keyBy("f0");
- DataStream<Tuple2<Long, Long>> group4 = src1.keyBy(new FirstSelector());
-
- int id1 = createDownStreamId(group1);
- int id2 = createDownStreamId(group2);
- int id3 = createDownStreamId(group3);
- int id4 = createDownStreamId(group4);
-
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), id1)));
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), id2)));
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), id3)));
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), id4)));
-
- assertTrue(isKeyed(group1));
- assertTrue(isKeyed(group2));
- assertTrue(isKeyed(group3));
- assertTrue(isKeyed(group4));
-
- //Testing DataStream partitioning
- DataStream<Tuple2<Long, Long>> partition1 = src1.partitionByHash(0);
- DataStream<Tuple2<Long, Long>> partition2 = src1.partitionByHash(1, 0);
- DataStream<Tuple2<Long, Long>> partition3 = src1.partitionByHash("f0");
- DataStream<Tuple2<Long, Long>> partition4 = src1.partitionByHash(new FirstSelector());
-
- int pid1 = createDownStreamId(partition1);
- int pid2 = createDownStreamId(partition2);
- int pid3 = createDownStreamId(partition3);
- int pid4 = createDownStreamId(partition4);
-
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), pid1)));
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), pid2)));
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), pid3)));
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), pid4)));
-
- assertFalse(isKeyed(partition1));
- assertFalse(isKeyed(partition3));
- assertFalse(isKeyed(partition2));
- assertFalse(isKeyed(partition4));
-
- // Testing DataStream custom partitioning
- Partitioner<Long> longPartitioner = new Partitioner<Long>() {
- @Override
- public int partition(Long key, int numPartitions) {
- return 100;
- }
- };
-
- DataStream<Tuple2<Long, Long>> customPartition1 = src1.partitionCustom(longPartitioner, 0);
- DataStream<Tuple2<Long, Long>> customPartition3 = src1.partitionCustom(longPartitioner, "f0");
- DataStream<Tuple2<Long, Long>> customPartition4 = src1.partitionCustom(longPartitioner, new FirstSelector());
-
- int cid1 = createDownStreamId(customPartition1);
- int cid2 = createDownStreamId(customPartition3);
- int cid3 = createDownStreamId(customPartition4);
-
- assertTrue(isCustomPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), cid1)));
- assertTrue(isCustomPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), cid2)));
- assertTrue(isCustomPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), cid3)));
-
- assertFalse(isKeyed(customPartition1));
- assertFalse(isKeyed(customPartition3));
- assertFalse(isKeyed(customPartition4));
-
- //Testing ConnectedStreams grouping
- ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedGroup1 = connected.keyBy(0, 0);
- Integer downStreamId1 = createDownStreamId(connectedGroup1);
-
- ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedGroup2 = connected.keyBy(new int[]{0}, new int[]{0});
- Integer downStreamId2 = createDownStreamId(connectedGroup2);
-
- ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedGroup3 = connected.keyBy("f0", "f0");
- Integer downStreamId3 = createDownStreamId(connectedGroup3);
-
- ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedGroup4 = connected.keyBy(new String[]{"f0"}, new String[]{"f0"});
- Integer downStreamId4 = createDownStreamId(connectedGroup4);
-
- ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedGroup5 = connected.keyBy(new FirstSelector(), new FirstSelector());
- Integer downStreamId5 = createDownStreamId(connectedGroup5);
-
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId1)));
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(), downStreamId1)));
-
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId2)));
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(), downStreamId2)));
-
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId3)));
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(), downStreamId3)));
-
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId4)));
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(), downStreamId4)));
-
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId5)));
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(), downStreamId5)));
-
- assertTrue(isKeyed(connectedGroup1));
- assertTrue(isKeyed(connectedGroup2));
- assertTrue(isKeyed(connectedGroup3));
- assertTrue(isKeyed(connectedGroup4));
- assertTrue(isKeyed(connectedGroup5));
-
- //Testing ConnectedStreams partitioning
- ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition1 = connected.partitionByHash(0, 0);
- Integer connectDownStreamId1 = createDownStreamId(connectedPartition1);
-
- ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition2 = connected.partitionByHash(new int[]{0}, new int[]{0});
- Integer connectDownStreamId2 = createDownStreamId(connectedPartition2);
-
- ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition3 = connected.partitionByHash("f0", "f0");
- Integer connectDownStreamId3 = createDownStreamId(connectedPartition3);
-
- ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition4 = connected.partitionByHash(new String[]{"f0"}, new String[]{"f0"});
- Integer connectDownStreamId4 = createDownStreamId(connectedPartition4);
-
- ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition5 = connected.partitionByHash(new FirstSelector(), new FirstSelector());
- Integer connectDownStreamId5 = createDownStreamId(connectedPartition5);
-
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(),
- connectDownStreamId1)));
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(),
- connectDownStreamId1)));
-
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(),
- connectDownStreamId2)));
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(),
- connectDownStreamId2)));
-
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(),
- connectDownStreamId3)));
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(),
- connectDownStreamId3)));
-
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(),
- connectDownStreamId4)));
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(),
- connectDownStreamId4)));
-
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(),
- connectDownStreamId5)));
- assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(),
- connectDownStreamId5)));
-
- assertFalse(isKeyed(connectedPartition1));
- assertFalse(isKeyed(connectedPartition2));
- assertFalse(isKeyed(connectedPartition3));
- assertFalse(isKeyed(connectedPartition4));
- assertFalse(isKeyed(connectedPartition5));
- }
-
- /**
- * Tests whether parallelism gets set.
- */
- @Test
- public void testParallelism() {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Tuple2<Long, Long>> src = env.fromElements(new Tuple2<>(0L, 0L));
- env.setParallelism(10);
-
- SingleOutputStreamOperator<Long, ?> map = src.map(new MapFunction<Tuple2<Long, Long>, Long>() {
- @Override
- public Long map(Tuple2<Long, Long> value) throws Exception {
- return null;
- }
- }).name("MyMap");
-
- DataStream<Long> windowed = map
- .windowAll(GlobalWindows.create())
- .trigger(PurgingTrigger.of(CountTrigger.of(10)))
- .fold(0L, new FoldFunction<Long, Long>() {
- @Override
- public Long fold(Long accumulator, Long value) throws Exception {
- return null;
- }
- });
-
- windowed.addSink(new NoOpSink<Long>());
-
- DataStreamSink<Long> sink = map.addSink(new SinkFunction<Long>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void invoke(Long value) throws Exception {
- }
- });
-
- assertEquals(1, env.getStreamGraph().getStreamNode(src.getId()).getParallelism());
- assertEquals(10, env.getStreamGraph().getStreamNode(map.getId()).getParallelism());
- assertEquals(1, env.getStreamGraph().getStreamNode(windowed.getId()).getParallelism());
- assertEquals(10,
- env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getParallelism());
-
- env.setParallelism(7);
-
- // Some parts, such as windowing rely on the fact that previous operators have a parallelism
- // set when instantiating the Discretizer. This would break if we dynamically changed
- // the parallelism of operations when changing the setting on the Execution Environment.
- assertEquals(1, env.getStreamGraph().getStreamNode(src.getId()).getParallelism());
- assertEquals(10, env.getStreamGraph().getStreamNode(map.getId()).getParallelism());
- assertEquals(1, env.getStreamGraph().getStreamNode(windowed.getId()).getParallelism());
- assertEquals(10, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getParallelism());
-
- try {
- src.setParallelism(3);
- fail();
- } catch (IllegalArgumentException success) {
- // do nothing
- }
-
- DataStreamSource<Long> parallelSource = env.generateSequence(0, 0);
- parallelSource.addSink(new NoOpSink<Long>());
- assertEquals(7, env.getStreamGraph().getStreamNode(parallelSource.getId()).getParallelism());
-
- parallelSource.setParallelism(3);
- assertEquals(3, env.getStreamGraph().getStreamNode(parallelSource.getId()).getParallelism());
-
- map.setParallelism(2);
- assertEquals(2, env.getStreamGraph().getStreamNode(map.getId()).getParallelism());
-
- sink.setParallelism(4);
- assertEquals(4, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getParallelism());
- }
-
- @Test
- public void testTypeInfo() {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<Long> src1 = env.generateSequence(0, 0);
- assertEquals(TypeExtractor.getForClass(Long.class), src1.getType());
-
- DataStream<Tuple2<Integer, String>> map = src1.map(new MapFunction<Long, Tuple2<Integer, String>>() {
- @Override
- public Tuple2<Integer, String> map(Long value) throws Exception {
- return null;
- }
- });
-
- assertEquals(TypeExtractor.getForObject(new Tuple2<>(0, "")), map.getType());
-
- DataStream<String> window = map
- .windowAll(GlobalWindows.create())
- .trigger(PurgingTrigger.of(CountTrigger.of(5)))
- .apply(new AllWindowFunction<Tuple2<Integer, String>, String, GlobalWindow>() {
- @Override
- public void apply(GlobalWindow window,
- Iterable<Tuple2<Integer, String>> values,
- Collector<String> out) throws Exception {
-
- }
- });
-
- assertEquals(TypeExtractor.getForClass(String.class), window.getType());
-
- DataStream<CustomPOJO> flatten = window
- .windowAll(GlobalWindows.create())
- .trigger(PurgingTrigger.of(CountTrigger.of(5)))
- .fold(new CustomPOJO(), new FoldFunction<String, CustomPOJO>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public CustomPOJO fold(CustomPOJO accumulator, String value) throws Exception {
- return null;
- }
- });
-
- assertEquals(TypeExtractor.getForClass(CustomPOJO.class), flatten.getType());
- }
-
- @Test
- public void operatorTest() {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Long> src = env.generateSequence(0, 0);
-
- MapFunction<Long, Integer> mapFunction = new MapFunction<Long, Integer>() {
- @Override
- public Integer map(Long value) throws Exception {
- return null;
- }
- };
- DataStream<Integer> map = src.map(mapFunction);
- map.addSink(new NoOpSink<Integer>());
- assertEquals(mapFunction, getFunctionForDataStream(map));
-
-
- FlatMapFunction<Long, Integer> flatMapFunction = new FlatMapFunction<Long, Integer>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap(Long value, Collector<Integer> out) throws Exception {
- }
- };
- DataStream<Integer> flatMap = src.flatMap(flatMapFunction);
- flatMap.addSink(new NoOpSink<Integer>());
- assertEquals(flatMapFunction, getFunctionForDataStream(flatMap));
-
- FilterFunction<Integer> filterFunction = new FilterFunction<Integer>() {
- @Override
- public boolean filter(Integer value) throws Exception {
- return false;
- }
- };
-
- DataStream<Integer> unionFilter = map.union(flatMap)
- .filter(filterFunction);
-
- unionFilter.addSink(new NoOpSink<Integer>());
-
- assertEquals(filterFunction, getFunctionForDataStream(unionFilter));
-
- try {
- env.getStreamGraph().getStreamEdge(map.getId(), unionFilter.getId());
- } catch (RuntimeException e) {
- fail(e.getMessage());
- }
-
- try {
- env.getStreamGraph().getStreamEdge(flatMap.getId(), unionFilter.getId());
- } catch (RuntimeException e) {
- fail(e.getMessage());
- }
-
- OutputSelector<Integer> outputSelector = new OutputSelector<Integer>() {
- @Override
- public Iterable<String> select(Integer value) {
- return null;
- }
- };
-
- SplitStream<Integer> split = unionFilter.split(outputSelector);
- split.select("dummy").addSink(new NoOpSink<Integer>());
- List<OutputSelector<?>> outputSelectors = env.getStreamGraph().getStreamNode(unionFilter.getId()).getOutputSelectors();
- assertEquals(1, outputSelectors.size());
- assertEquals(outputSelector, outputSelectors.get(0));
-
- DataStream<Integer> select = split.select("a");
- DataStreamSink<Integer> sink = select.print();
-
- StreamEdge splitEdge = env.getStreamGraph().getStreamEdge(unionFilter.getId(), sink.getTransformation().getId());
- assertEquals("a", splitEdge.getSelectedNames().get(0));
-
- ConnectedStreams<Integer, Integer> connect = map.connect(flatMap);
- CoMapFunction<Integer, Integer, String> coMapper = new CoMapFunction<Integer, Integer, String>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map1(Integer value) {
- return null;
- }
-
- @Override
- public String map2(Integer value) {
- return null;
- }
- };
- DataStream<String> coMap = connect.map(coMapper);
- coMap.addSink(new NoOpSink<String>());
- assertEquals(coMapper, getFunctionForDataStream(coMap));
-
- try {
- env.getStreamGraph().getStreamEdge(map.getId(), coMap.getId());
- } catch (RuntimeException e) {
- fail(e.getMessage());
- }
-
- try {
- env.getStreamGraph().getStreamEdge(flatMap.getId(), coMap.getId());
- } catch (RuntimeException e) {
- fail(e.getMessage());
- }
- }
-
- @Test
- public void sinkKeyTest() {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSink<Long> sink = env.generateSequence(1, 100).print();
- assertTrue(env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getStatePartitioner() == null);
- assertTrue(env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getInEdges().get(0).getPartitioner() instanceof ForwardPartitioner);
-
- KeySelector<Long, Long> key1 = new KeySelector<Long, Long>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Long getKey(Long value) throws Exception {
- return (long) 0;
- }
- };
-
- DataStreamSink<Long> sink2 = env.generateSequence(1, 100).keyBy(key1).print();
-
- assertNotNull(env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getStatePartitioner());
- assertNotNull(env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getStateKeySerializer());
- assertNotNull(env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getStateKeySerializer());
- assertEquals(key1, env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getStatePartitioner());
- assertTrue(env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getInEdges().get(0).getPartitioner() instanceof HashPartitioner);
-
- KeySelector<Long, Long> key2 = new KeySelector<Long, Long>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Long getKey(Long value) throws Exception {
- return (long) 0;
- }
- };
-
- DataStreamSink<Long> sink3 = env.generateSequence(1, 100).keyBy(key2).print();
-
- assertTrue(env.getStreamGraph().getStreamNode(sink3.getTransformation().getId()).getStatePartitioner() != null);
- assertEquals(key2, env.getStreamGraph().getStreamNode(sink3.getTransformation().getId()).getStatePartitioner());
- assertTrue(env.getStreamGraph().getStreamNode(sink3.getTransformation().getId()).getInEdges().get(0).getPartitioner() instanceof HashPartitioner);
- }
-
- @Test
- public void testChannelSelectors() {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Long> src = env.generateSequence(0, 0);
-
- DataStream<Long> broadcast = src.broadcast();
- DataStreamSink<Long> broadcastSink = broadcast.print();
- StreamPartitioner<?> broadcastPartitioner =
- env.getStreamGraph().getStreamEdge(src.getId(),
- broadcastSink.getTransformation().getId()).getPartitioner();
- assertTrue(broadcastPartitioner instanceof BroadcastPartitioner);
-
- DataStream<Long> shuffle = src.shuffle();
- DataStreamSink<Long> shuffleSink = shuffle.print();
- StreamPartitioner<?> shufflePartitioner =
- env.getStreamGraph().getStreamEdge(src.getId(),
- shuffleSink.getTransformation().getId()).getPartitioner();
- assertTrue(shufflePartitioner instanceof ShufflePartitioner);
-
- DataStream<Long> forward = src.forward();
- DataStreamSink<Long> forwardSink = forward.print();
- StreamPartitioner<?> forwardPartitioner =
- env.getStreamGraph().getStreamEdge(src.getId(),
- forwardSink.getTransformation().getId()).getPartitioner();
- assertTrue(forwardPartitioner instanceof ForwardPartitioner);
-
- DataStream<Long> rebalance = src.rebalance();
- DataStreamSink<Long> rebalanceSink = rebalance.print();
- StreamPartitioner<?> rebalancePartitioner =
- env.getStreamGraph().getStreamEdge(src.getId(),
- rebalanceSink.getTransformation().getId()).getPartitioner();
- assertTrue(rebalancePartitioner instanceof RebalancePartitioner);
-
- DataStream<Long> global = src.global();
- DataStreamSink<Long> globalSink = global.print();
- StreamPartitioner<?> globalPartitioner =
- env.getStreamGraph().getStreamEdge(src.getId(),
- globalSink.getTransformation().getId()).getPartitioner();
- assertTrue(globalPartitioner instanceof GlobalPartitioner);
- }
-
- /////////////////////////////////////////////////////////////
- // Utilities
- /////////////////////////////////////////////////////////////
-
- private static StreamOperator<?> getOperatorForDataStream(DataStream<?> dataStream) {
- StreamExecutionEnvironment env = dataStream.getExecutionEnvironment();
- StreamGraph streamGraph = env.getStreamGraph();
- return streamGraph.getStreamNode(dataStream.getId()).getOperator();
- }
-
- private static Function getFunctionForDataStream(DataStream<?> dataStream) {
- AbstractUdfStreamOperator<?, ?> operator =
- (AbstractUdfStreamOperator<?, ?>) getOperatorForDataStream(dataStream);
- return operator.getUserFunction();
- }
-
- private static Integer createDownStreamId(DataStream<?> dataStream) {
- return dataStream.print().getTransformation().getId();
- }
-
- private static boolean isKeyed(DataStream<?> dataStream) {
- return dataStream instanceof KeyedStream;
- }
-
- @SuppressWarnings("rawtypes,unchecked")
- private static Integer createDownStreamId(ConnectedStreams dataStream) {
- SingleOutputStreamOperator<?, ?> coMap = dataStream.map(new CoMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Object>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object map1(Tuple2<Long, Long> value) {
- return null;
- }
-
- @Override
- public Object map2(Tuple2<Long, Long> value) {
- return null;
- }
- });
- coMap.addSink(new NoOpSink());
- return coMap.getId();
- }
-
- private static boolean isKeyed(ConnectedStreams<?, ?> dataStream) {
- return (dataStream.getFirstInput() instanceof KeyedStream && dataStream.getSecondInput() instanceof KeyedStream);
- }
-
- private static boolean isPartitioned(StreamEdge edge) {
- return edge.getPartitioner() instanceof HashPartitioner;
- }
-
- private static boolean isCustomPartitioned(StreamEdge edge) {
- return edge.getPartitioner() instanceof CustomPartitionerWrapper;
- }
-
- private static class FirstSelector implements KeySelector<Tuple2<Long, Long>, Long> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Long getKey(Tuple2<Long, Long> value) throws Exception {
- return value.f0;
- }
- }
-
- public static class CustomPOJO {
- private String s;
- private int i;
-
- public CustomPOJO() {
- }
-
- public void setS(String s) {
- this.s = s;
- }
-
- public void setI(int i) {
- this.i = i;
- }
-
- public String getS() {
- return s;
- }
-
- public int getI() {
- return i;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
deleted file mode 100644
index bd97e84..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ /dev/null
@@ -1,614 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.flink.streaming.api;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.IterativeStream;
-import org.apache.flink.streaming.api.datastream.IterativeStream.ConnectedIterativeStreams;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.StreamNode;
-import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
-import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
-import org.apache.flink.streaming.util.EvenOddOutputSelector;
-import org.apache.flink.streaming.util.NoOpIntMap;
-import org.apache.flink.streaming.util.ReceiveCheckNoOpSink;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-@SuppressWarnings({ "unchecked", "unused", "serial" })
-public class IterateTest extends StreamingMultipleProgramsTestBase {
-
- private static boolean iterated[];
-
- @Test(expected = UnsupportedOperationException.class)
- public void testIncorrectParallelism() throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<Integer> source = env.fromElements(1, 10);
-
- IterativeStream<Integer> iter1 = source.iterate();
- SingleOutputStreamOperator<Integer, ?> map1 = iter1.map(NoOpIntMap);
- iter1.closeWith(map1).print();
- }
-
- @Test
- public void testDoubleClosing() throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // introduce dummy mapper to get to correct parallelism
- DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
-
- IterativeStream<Integer> iter1 = source.iterate();
-
- iter1.closeWith(iter1.map(NoOpIntMap));
- iter1.closeWith(iter1.map(NoOpIntMap));
- }
-
-
- @Test(expected = UnsupportedOperationException.class)
- public void testDifferingParallelism() throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // introduce dummy mapper to get to correct parallelism
- DataStream<Integer> source = env.fromElements(1, 10)
- .map(NoOpIntMap);
-
- IterativeStream<Integer> iter1 = source.iterate();
-
-
- iter1.closeWith(iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2));
-
- }
-
-
- @Test(expected = UnsupportedOperationException.class)
- public void testCoDifferingParallelism() throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // introduce dummy mapper to get to correct parallelism
- DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
-
- ConnectedIterativeStreams<Integer, Integer> coIter = source.iterate().withFeedbackType(
- Integer.class);
-
-
- coIter.closeWith(coIter.map(NoOpIntCoMap).setParallelism(DEFAULT_PARALLELISM / 2));
-
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testClosingFromOutOfLoop() throws Exception {
-
- // this test verifies that we cannot close an iteration with a DataStream that does not
- // have the iteration in its predecessors
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // introduce dummy mapper to get to correct parallelism
- DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
-
- IterativeStream<Integer> iter1 = source.iterate();
- IterativeStream<Integer> iter2 = source.iterate();
-
-
- iter2.closeWith(iter1.map(NoOpIntMap));
-
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testCoIterClosingFromOutOfLoop() throws Exception {
-
- // this test verifies that we cannot close an iteration with a DataStream that does not
- // have the iteration in its predecessors
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // introduce dummy mapper to get to correct parallelism
- DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
-
- IterativeStream<Integer> iter1 = source.iterate();
- ConnectedIterativeStreams<Integer, Integer> coIter = source.iterate().withFeedbackType(
- Integer.class);
-
-
- coIter.closeWith(iter1.map(NoOpIntMap));
-
- }
-
- @Test(expected = IllegalStateException.class)
- public void testExecutionWithEmptyIteration() throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
-
- IterativeStream<Integer> iter1 = source.iterate();
-
- iter1.map(NoOpIntMap).print();
-
- env.execute();
- }
-
- @Test
- public void testImmutabilityWithCoiteration() {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap); // for rebalance
-
- IterativeStream<Integer> iter1 = source.iterate();
- // Calling withFeedbackType should create a new iteration
- ConnectedIterativeStreams<Integer, String> iter2 = iter1.withFeedbackType(String.class);
-
- iter1.closeWith(iter1.map(NoOpIntMap)).print();
- iter2.closeWith(iter2.map(NoOpCoMap)).print();
-
- StreamGraph graph = env.getStreamGraph();
-
- assertEquals(2, graph.getIterationSourceSinkPairs().size());
-
- for (Tuple2<StreamNode, StreamNode> sourceSinkPair: graph.getIterationSourceSinkPairs()) {
- assertEquals(sourceSinkPair.f0.getOutEdges().get(0).getTargetVertex(), sourceSinkPair.f1.getInEdges().get(0).getSourceVertex());
- }
- }
-
- @Test
- public void testmultipleHeadsTailsSimple() {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5)
- .shuffle()
- .map(NoOpIntMap).name("ParallelizeMapShuffle");
- DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5)
- .map(NoOpIntMap).name("ParallelizeMapRebalance");
-
- IterativeStream<Integer> iter1 = source1.union(source2).iterate();
-
- DataStream<Integer> head1 = iter1.map(NoOpIntMap).name("IterRebalanceMap").setParallelism(DEFAULT_PARALLELISM / 2);
- DataStream<Integer> head2 = iter1.map(NoOpIntMap).name("IterForwardMap");
- DataStreamSink<Integer> head3 = iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2).addSink(new ReceiveCheckNoOpSink<Integer>());
- DataStreamSink<Integer> head4 = iter1.map(NoOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>());
-
- SplitStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5)
- .map(NoOpIntMap).name("EvenOddSourceMap")
- .split(new EvenOddOutputSelector());
-
- iter1.closeWith(source3.select("even").union(
- head1.rebalance().map(NoOpIntMap).broadcast(), head2.shuffle()));
-
- StreamGraph graph = env.getStreamGraph();
-
- JobGraph jg = graph.getJobGraph();
-
- assertEquals(1, graph.getIterationSourceSinkPairs().size());
-
- Tuple2<StreamNode, StreamNode> sourceSinkPair = graph.getIterationSourceSinkPairs().iterator().next();
- StreamNode itSource = sourceSinkPair.f0;
- StreamNode itSink = sourceSinkPair.f1;
-
- assertEquals(4, itSource.getOutEdges().size());
- assertEquals(3, itSink.getInEdges().size());
-
- assertEquals(itSource.getParallelism(), itSink.getParallelism());
-
- for (StreamEdge edge : itSource.getOutEdges()) {
- if (edge.getTargetVertex().getOperatorName().equals("IterRebalanceMap")) {
- assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
- } else if (edge.getTargetVertex().getOperatorName().equals("IterForwardMap")) {
- assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
- }
- }
- for (StreamEdge edge : itSink.getInEdges()) {
- if (graph.getStreamNode(edge.getSourceId()).getOperatorName().equals("ParallelizeMapShuffle")) {
- assertTrue(edge.getPartitioner() instanceof ShufflePartitioner);
- }
-
- if (graph.getStreamNode(edge.getSourceId()).getOperatorName().equals("ParallelizeMapForward")) {
- assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
- }
-
- if (graph.getStreamNode(edge.getSourceId()).getOperatorName().equals("EvenOddSourceMap")) {
- assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
- assertTrue(edge.getSelectedNames().contains("even"));
- }
- }
-
- // Test co-location
-
- JobVertex itSource1 = null;
- JobVertex itSink1 = null;
-
- for (JobVertex vertex : jg.getVertices()) {
- if (vertex.getName().contains("IterationSource")) {
- itSource1 = vertex;
- } else if (vertex.getName().contains("IterationSink")) {
-
- itSink1 = vertex;
-
- }
- }
-
- assertTrue(itSource1.getCoLocationGroup() != null);
- assertEquals(itSource1.getCoLocationGroup(), itSink1.getCoLocationGroup());
- }
-
- @Test
- public void testmultipleHeadsTailsWithTailPartitioning() {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5)
- .shuffle()
- .map(NoOpIntMap);
-
- DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5)
- .map(NoOpIntMap);
-
- IterativeStream<Integer> iter1 = source1.union(source2).iterate();
-
- DataStream<Integer> head1 = iter1.map(NoOpIntMap).name("map1");
- DataStream<Integer> head2 = iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2).rebalance().name(
- "shuffle");
- DataStreamSink<Integer> head3 = iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2)
- .addSink(new ReceiveCheckNoOpSink<Integer>());
- DataStreamSink<Integer> head4 = iter1.map(NoOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>());
-
- SplitStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5)
- .map(NoOpIntMap)
- .name("split")
- .split(new EvenOddOutputSelector());
-
- iter1.closeWith(
- source3.select("even").union(
- head1.map(NoOpIntMap).broadcast().name("bc"),
- head2.map(NoOpIntMap).shuffle()));
-
- StreamGraph graph = env.getStreamGraph();
-
- JobGraph jg = graph.getJobGraph();
-
- assertEquals(1, graph.getIterationSourceSinkPairs().size());
-
- Tuple2<StreamNode, StreamNode> sourceSinkPair = graph.getIterationSourceSinkPairs().iterator().next();
- StreamNode itSource = sourceSinkPair.f0;
- StreamNode itSink = sourceSinkPair.f1;
-
- assertEquals(4, itSource.getOutEdges().size());
- assertEquals(3, itSink.getInEdges().size());
-
-
- assertEquals(itSource.getParallelism(), itSink.getParallelism());
-
- for (StreamEdge edge : itSource.getOutEdges()) {
- if (edge.getTargetVertex().getOperatorName().equals("map1")) {
- assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
- assertEquals(4, edge.getTargetVertex().getParallelism());
- } else if (edge.getTargetVertex().getOperatorName().equals("shuffle")) {
- assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
- assertEquals(2, edge.getTargetVertex().getParallelism());
- }
- }
- for (StreamEdge edge : itSink.getInEdges()) {
- String tailName = edge.getSourceVertex().getOperatorName();
- if (tailName.equals("split")) {
- assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
- assertTrue(edge.getSelectedNames().contains("even"));
- } else if (tailName.equals("bc")) {
- assertTrue(edge.getPartitioner() instanceof BroadcastPartitioner);
- } else if (tailName.equals("shuffle")) {
- assertTrue(edge.getPartitioner() instanceof ShufflePartitioner);
- }
- }
-
- // Test co-location
-
- JobVertex itSource1 = null;
- JobVertex itSink1 = null;
-
- for (JobVertex vertex : jg.getVertices()) {
- if (vertex.getName().contains("IterationSource")) {
- itSource1 = vertex;
- } else if (vertex.getName().contains("IterationSink")) {
- itSink1 = vertex;
- }
- }
-
- assertTrue(itSource1.getCoLocationGroup() != null);
- assertTrue(itSink1.getCoLocationGroup() != null);
-
- assertEquals(itSource1.getCoLocationGroup(), itSink1.getCoLocationGroup());
- }
-
- @SuppressWarnings("rawtypes")
- @Test
- public void testSimpleIteration() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- iterated = new boolean[DEFAULT_PARALLELISM];
-
- DataStream<Boolean> source = env.fromCollection(Collections.nCopies(DEFAULT_PARALLELISM * 2, false))
- .map(NoOpBoolMap).name("ParallelizeMap");
-
- IterativeStream<Boolean> iteration = source.iterate(3000);
-
- DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).map(NoOpBoolMap);
-
- iteration.map(NoOpBoolMap).addSink(new ReceiveCheckNoOpSink());
-
- iteration.closeWith(increment).addSink(new ReceiveCheckNoOpSink());
-
- env.execute();
-
- for (boolean iter : iterated) {
- assertTrue(iter);
- }
-
- }
-
- @Test
- public void testCoIteration() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
-
- DataStream<String> otherSource = env.fromElements("1000", "2000")
- .map(NoOpStrMap).name("ParallelizeMap");
-
-
- ConnectedIterativeStreams<Integer, String> coIt = env.fromElements(0, 0)
- .map(NoOpIntMap).name("ParallelizeMap")
- .iterate(2000)
- .withFeedbackType("String");
-
- try {
- coIt.keyBy(1, 2);
- fail();
- } catch (InvalidProgramException e) {
- // this is expected
- }
-
- DataStream<String> head = coIt
- .flatMap(new RichCoFlatMapFunction<Integer, String, String>() {
-
- private static final long serialVersionUID = 1L;
- boolean seenFromSource = false;
-
- @Override
- public void flatMap1(Integer value, Collector<String> out) throws Exception {
- out.collect(((Integer) (value + 1)).toString());
- }
-
- @Override
- public void flatMap2(String value, Collector<String> out) throws Exception {
- Integer intVal = Integer.valueOf(value);
- if (intVal < 2) {
- out.collect(((Integer) (intVal + 1)).toString());
- }
- if (intVal == 1000 || intVal == 2000) {
- seenFromSource = true;
- }
- }
-
- @Override
- public void close() {
- assertTrue(seenFromSource);
- }
- });
-
- coIt.map(new CoMapFunction<Integer, String, String>() {
-
- @Override
- public String map1(Integer value) throws Exception {
- return value.toString();
- }
-
- @Override
- public String map2(String value) throws Exception {
- return value;
- }
- }).addSink(new ReceiveCheckNoOpSink<String>());
-
- coIt.closeWith(head.broadcast().union(otherSource));
-
- head.addSink(new TestSink()).setParallelism(1);
-
- assertEquals(1, env.getStreamGraph().getIterationSourceSinkPairs().size());
-
- env.execute();
-
- Collections.sort(TestSink.collected);
- assertEquals(Arrays.asList("1", "1", "2", "2", "2", "2"), TestSink.collected);
- }
-
- @Test
- public void testGroupByFeedback() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(DEFAULT_PARALLELISM - 1);
-
- KeySelector<Integer, Integer> key = new KeySelector<Integer, Integer>() {
-
- @Override
- public Integer getKey(Integer value) throws Exception {
- return value % 3;
- }
- };
-
- DataStream<Integer> source = env.fromElements(1, 2, 3)
- .map(NoOpIntMap).name("ParallelizeMap");
-
- IterativeStream<Integer> it = source.keyBy(key).iterate(3000);
-
- DataStream<Integer> head = it.flatMap(new RichFlatMapFunction<Integer, Integer>() {
-
- int received = 0;
- int key = -1;
-
- @Override
- public void flatMap(Integer value, Collector<Integer> out) throws Exception {
- received++;
- if (key == -1) {
- key = value % 3;
- } else {
- assertEquals(key, value % 3);
- }
- if (value > 0) {
- out.collect(value - 1);
- }
- }
-
- @Override
- public void close() {
- assertTrue(received > 1);
- }
- });
-
- it.closeWith(head.keyBy(key).union(head.map(NoOpIntMap).keyBy(key))).addSink(new ReceiveCheckNoOpSink<Integer>());
-
- env.execute();
- }
-
- @SuppressWarnings("deprecation")
- @Test
- public void testWithCheckPointing() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- env.enableCheckpointing();
-
- DataStream<Boolean> source = env .fromCollection(Collections.nCopies(DEFAULT_PARALLELISM * 2, false))
- .map(NoOpBoolMap).name("ParallelizeMap");
-
-
- IterativeStream<Boolean> iteration = source.iterate(3000);
-
- iteration.closeWith(iteration.flatMap(new IterationHead())).addSink(new ReceiveCheckNoOpSink<Boolean>());
-
- try {
- env.execute();
-
- // this statement should never be reached
- fail();
- } catch (UnsupportedOperationException e) {
- // expected behaviour
- }
-
- // Test force checkpointing
-
- try {
- env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, false);
- env.execute();
-
- // this statement should never be reached
- fail();
- } catch (UnsupportedOperationException e) {
- // expected behaviour
- }
-
- env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, true);
- env.getStreamGraph().getJobGraph();
- }
-
- public static final class IterationHead extends RichFlatMapFunction<Boolean, Boolean> {
- public void flatMap(Boolean value, Collector<Boolean> out) throws Exception {
- int indx = getRuntimeContext().getIndexOfThisSubtask();
- if (value) {
- iterated[indx] = true;
- } else {
- out.collect(true);
- }
- }
- }
-
- public static CoMapFunction<Integer, String, String> NoOpCoMap = new CoMapFunction<Integer, String, String>() {
-
- public String map1(Integer value) throws Exception {
- return value.toString();
- }
-
- public String map2(String value) throws Exception {
- return value;
- }
- };
-
- public static MapFunction<Integer, Integer> NoOpIntMap = new NoOpIntMap();
-
- public static MapFunction<String, String> NoOpStrMap = new MapFunction<String, String>() {
-
- public String map(String value) throws Exception {
- return value;
- }
-
- };
-
- public static CoMapFunction<Integer, Integer, Integer> NoOpIntCoMap = new CoMapFunction<Integer, Integer, Integer>() {
-
- public Integer map1(Integer value) throws Exception {
- return value;
- }
-
- public Integer map2(Integer value) throws Exception {
- return value;
- }
-
- };
-
- public static MapFunction<Boolean, Boolean> NoOpBoolMap = new MapFunction<Boolean, Boolean>() {
-
- public Boolean map(Boolean value) throws Exception {
- return value;
- }
-
- };
-
- public static class TestSink implements SinkFunction<String> {
-
- private static final long serialVersionUID = 1L;
- public static List<String> collected = new ArrayList<String>();
-
- @Override
- public void invoke(String value) throws Exception {
- collected.add(value);
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
deleted file mode 100644
index 8525d37..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestListResultSink;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.junit.Test;
-
-public class OutputSplitterTest extends StreamingMultipleProgramsTestBase {
-
- private static ArrayList<Integer> expectedSplitterResult = new ArrayList<Integer>();
-
- @SuppressWarnings("unchecked")
- @Test
- public void testOnMergedDataStream() throws Exception {
- TestListResultSink<Integer> splitterResultSink1 = new TestListResultSink<Integer>();
- TestListResultSink<Integer> splitterResultSink2 = new TestListResultSink<Integer>();
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- env.setBufferTimeout(1);
-
- DataStream<Integer> d1 = env.fromElements(0, 2, 4, 6, 8);
- DataStream<Integer> d2 = env.fromElements(1, 3, 5, 7, 9);
-
- d1 = d1.union(d2);
-
- d1.split(new OutputSelector<Integer>() {
- private static final long serialVersionUID = 8354166915727490130L;
-
- @Override
- public Iterable<String> select(Integer value) {
- List<String> s = new ArrayList<String>();
- if (value > 4) {
- s.add(">");
- } else {
- s.add("<");
- }
- return s;
- }
- }).select(">").addSink(splitterResultSink1);
-
- d1.split(new OutputSelector<Integer>() {
- private static final long serialVersionUID = -6822487543355994807L;
-
- @Override
- public Iterable<String> select(Integer value) {
- List<String> s = new ArrayList<String>();
- if (value % 3 == 0) {
- s.add("yes");
- } else {
- s.add("no");
- }
- return s;
- }
- }).select("yes").addSink(splitterResultSink2);
- env.execute();
-
- expectedSplitterResult.clear();
- expectedSplitterResult.addAll(Arrays.asList(5, 6, 7, 8, 9));
- assertEquals(expectedSplitterResult, splitterResultSink1.getSortedResult());
-
- expectedSplitterResult.clear();
- expectedSplitterResult.addAll(Arrays.asList(0, 3, 6, 9));
- assertEquals(expectedSplitterResult, splitterResultSink2.getSortedResult());
- }
-
- @Test
- public void testOnSingleDataStream() throws Exception {
- TestListResultSink<Integer> splitterResultSink1 = new TestListResultSink<Integer>();
- TestListResultSink<Integer> splitterResultSink2 = new TestListResultSink<Integer>();
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- env.setBufferTimeout(1);
-
- DataStream<Integer> ds = env.fromElements(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
-
- ds.split(new OutputSelector<Integer>() {
- private static final long serialVersionUID = 2524335410904414121L;
-
- @Override
- public Iterable<String> select(Integer value) {
- List<String> s = new ArrayList<String>();
- if (value % 2 == 0) {
- s.add("even");
- } else {
- s.add("odd");
- }
- return s;
- }
- }).select("even").addSink(splitterResultSink1);
-
- ds.split(new OutputSelector<Integer>() {
-
- private static final long serialVersionUID = -511693919586034092L;
-
- @Override
- public Iterable<String> select(Integer value) {
- List<String> s = new ArrayList<String>();
- if (value % 4 == 0) {
- s.add("yes");
- } else {
- s.add("no");
- }
- return s;
- }
- }).select("yes").addSink(splitterResultSink2);
- env.execute();
-
- expectedSplitterResult.clear();
- expectedSplitterResult.addAll(Arrays.asList(0, 2, 4, 6, 8));
- assertEquals(expectedSplitterResult, splitterResultSink1.getSortedResult());
-
- expectedSplitterResult.clear();
- expectedSplitterResult.addAll(Arrays.asList(0, 4, 8));
- assertEquals(expectedSplitterResult, splitterResultSink2.getSortedResult());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
deleted file mode 100644
index a6c6936..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.NoOpIntMap;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestListResultSink;
-
-import org.junit.Test;
-
-/**
- * IT case that tests the different stream partitioning schemes.
- */
-public class PartitionerTest extends StreamingMultipleProgramsTestBase {
-
- @Test(expected = UnsupportedOperationException.class)
- public void testForwardFailsLowToHighParallelism() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<Integer> src = env.fromElements(1, 2, 3);
-
- // this doesn't work because it goes from 1 to 3
- src.forward().map(new NoOpIntMap());
-
- env.execute();
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testForwardFailsHightToLowParallelism() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // this does a rebalance that works
- DataStream<Integer> src = env.fromElements(1, 2, 3).map(new NoOpIntMap());
-
- // this doesn't work because it goes from 3 to 1
- src.forward().map(new NoOpIntMap()).setParallelism(1);
-
- env.execute();
- }
-
-
- @Test
- public void partitionerTest() {
-
- TestListResultSink<Tuple2<Integer, String>> hashPartitionResultSink =
- new TestListResultSink<Tuple2<Integer, String>>();
- TestListResultSink<Tuple2<Integer, String>> customPartitionResultSink =
- new TestListResultSink<Tuple2<Integer, String>>();
- TestListResultSink<Tuple2<Integer, String>> broadcastPartitionResultSink =
- new TestListResultSink<Tuple2<Integer, String>>();
- TestListResultSink<Tuple2<Integer, String>> forwardPartitionResultSink =
- new TestListResultSink<Tuple2<Integer, String>>();
- TestListResultSink<Tuple2<Integer, String>> rebalancePartitionResultSink =
- new TestListResultSink<Tuple2<Integer, String>>();
- TestListResultSink<Tuple2<Integer, String>> globalPartitionResultSink =
- new TestListResultSink<Tuple2<Integer, String>>();
-
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(3);
-
- DataStream<Tuple1<String>> src = env.fromElements(
- new Tuple1<String>("a"),
- new Tuple1<String>("b"),
- new Tuple1<String>("b"),
- new Tuple1<String>("a"),
- new Tuple1<String>("a"),
- new Tuple1<String>("c"),
- new Tuple1<String>("a")
- );
-
- // partition by hash
- src
- .partitionByHash(0)
- .map(new SubtaskIndexAssigner())
- .addSink(hashPartitionResultSink);
-
- // partition custom
- DataStream<Tuple2<Integer, String>> partitionCustom = src
- .partitionCustom(new Partitioner<String>() {
- @Override
- public int partition(String key, int numPartitions) {
- if (key.equals("c")) {
- return 2;
- } else {
- return 0;
- }
- }
- }, 0)
- .map(new SubtaskIndexAssigner());
-
- partitionCustom.addSink(customPartitionResultSink);
-
- // partition broadcast
- src.broadcast().map(new SubtaskIndexAssigner()).addSink(broadcastPartitionResultSink);
-
- // partition rebalance
- src.rebalance().map(new SubtaskIndexAssigner()).addSink(rebalancePartitionResultSink);
-
- // partition forward
- src.map(new MapFunction<Tuple1<String>, Tuple1<String>>() {
- private static final long serialVersionUID = 1L;
- @Override
- public Tuple1<String> map(Tuple1<String> value) throws Exception {
- return value;
- }
- })
- .forward()
- .map(new SubtaskIndexAssigner())
- .addSink(forwardPartitionResultSink);
-
- // partition global
- src.global().map(new SubtaskIndexAssigner()).addSink(globalPartitionResultSink);
-
- try {
- env.execute();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- List<Tuple2<Integer, String>> hashPartitionResult = hashPartitionResultSink.getResult();
- List<Tuple2<Integer, String>> customPartitionResult = customPartitionResultSink.getResult();
- List<Tuple2<Integer, String>> broadcastPartitionResult = broadcastPartitionResultSink.getResult();
- List<Tuple2<Integer, String>> forwardPartitionResult = forwardPartitionResultSink.getResult();
- List<Tuple2<Integer, String>> rebalancePartitionResult = rebalancePartitionResultSink.getResult();
- List<Tuple2<Integer, String>> globalPartitionResult = globalPartitionResultSink.getResult();
-
- verifyHashPartitioning(hashPartitionResult);
- verifyCustomPartitioning(customPartitionResult);
- verifyBroadcastPartitioning(broadcastPartitionResult);
- verifyRebalancePartitioning(forwardPartitionResult);
- verifyRebalancePartitioning(rebalancePartitionResult);
- verifyGlobalPartitioning(globalPartitionResult);
- }
-
- private static void verifyHashPartitioning(List<Tuple2<Integer, String>> hashPartitionResult) {
- HashMap<String, Integer> verifier = new HashMap<String, Integer>();
- for (Tuple2<Integer, String> elem : hashPartitionResult) {
- Integer subtaskIndex = verifier.get(elem.f1);
- if (subtaskIndex == null) {
- verifier.put(elem.f1, elem.f0);
- } else if (subtaskIndex != elem.f0) {
- fail();
- }
- }
- }
-
- private static void verifyCustomPartitioning(List<Tuple2<Integer, String>> customPartitionResult) {
- for (Tuple2<Integer, String> stringWithSubtask : customPartitionResult) {
- if (stringWithSubtask.f1.equals("c")) {
- assertEquals(new Integer(2), stringWithSubtask.f0);
- } else {
- assertEquals(new Integer(0), stringWithSubtask.f0);
- }
- }
- }
-
- private static void verifyBroadcastPartitioning(List<Tuple2<Integer, String>> broadcastPartitionResult) {
- List<Tuple2<Integer, String>> expected = Arrays.asList(
- new Tuple2<Integer, String>(0, "a"),
- new Tuple2<Integer, String>(0, "b"),
- new Tuple2<Integer, String>(0, "b"),
- new Tuple2<Integer, String>(0, "a"),
- new Tuple2<Integer, String>(0, "a"),
- new Tuple2<Integer, String>(0, "c"),
- new Tuple2<Integer, String>(0, "a"),
- new Tuple2<Integer, String>(1, "a"),
- new Tuple2<Integer, String>(1, "b"),
- new Tuple2<Integer, String>(1, "b"),
- new Tuple2<Integer, String>(1, "a"),
- new Tuple2<Integer, String>(1, "a"),
- new Tuple2<Integer, String>(1, "c"),
- new Tuple2<Integer, String>(1, "a"),
- new Tuple2<Integer, String>(2, "a"),
- new Tuple2<Integer, String>(2, "b"),
- new Tuple2<Integer, String>(2, "b"),
- new Tuple2<Integer, String>(2, "a"),
- new Tuple2<Integer, String>(2, "a"),
- new Tuple2<Integer, String>(2, "c"),
- new Tuple2<Integer, String>(2, "a"));
-
- assertEquals(
- new HashSet<Tuple2<Integer, String>>(expected),
- new HashSet<Tuple2<Integer, String>>(broadcastPartitionResult));
- }
-
- private static void verifyRebalancePartitioning(List<Tuple2<Integer, String>> rebalancePartitionResult) {
- List<Tuple2<Integer, String>> expected = Arrays.asList(
- new Tuple2<Integer, String>(0, "a"),
- new Tuple2<Integer, String>(1, "b"),
- new Tuple2<Integer, String>(2, "b"),
- new Tuple2<Integer, String>(0, "a"),
- new Tuple2<Integer, String>(1, "a"),
- new Tuple2<Integer, String>(2, "c"),
- new Tuple2<Integer, String>(0, "a"));
-
- assertEquals(
- new HashSet<Tuple2<Integer, String>>(expected),
- new HashSet<Tuple2<Integer, String>>(rebalancePartitionResult));
- }
-
- private static void verifyGlobalPartitioning(List<Tuple2<Integer, String>> globalPartitionResult) {
- List<Tuple2<Integer, String>> expected = Arrays.asList(
- new Tuple2<Integer, String>(0, "a"),
- new Tuple2<Integer, String>(0, "b"),
- new Tuple2<Integer, String>(0, "b"),
- new Tuple2<Integer, String>(0, "a"),
- new Tuple2<Integer, String>(0, "a"),
- new Tuple2<Integer, String>(0, "c"),
- new Tuple2<Integer, String>(0, "a"));
-
- assertEquals(
- new HashSet<Tuple2<Integer, String>>(expected),
- new HashSet<Tuple2<Integer, String>>(globalPartitionResult));
- }
-
- private static class SubtaskIndexAssigner extends RichMapFunction<Tuple1<String>, Tuple2<Integer, String>> {
- private static final long serialVersionUID = 1L;
-
- private int indexOfSubtask;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- RuntimeContext runtimeContext = getRuntimeContext();
- indexOfSubtask = runtimeContext.getIndexOfThisSubtask();
- }
-
- @Override
- public Tuple2<Integer, String> map(Tuple1<String> value) throws Exception {
- return new Tuple2<Integer, String>(indexOfSubtask, value.f0);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
deleted file mode 100644
index b53649a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
-import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
-import org.apache.flink.streaming.util.SourceFunctionUtil;
-import org.junit.Test;
-
-public class SourceFunctionTest {
-
- @Test
- public void fromElementsTest() throws Exception {
- List<Integer> expectedList = Arrays.asList(1, 2, 3);
- List<Integer> actualList = SourceFunctionUtil.runSourceFunction(CommonTestUtils.createCopySerializable(
- new FromElementsFunction<Integer>(
- IntSerializer.INSTANCE,
- 1,
- 2,
- 3)));
- assertEquals(expectedList, actualList);
- }
-
- @Test
- public void fromCollectionTest() throws Exception {
- List<Integer> expectedList = Arrays.asList(1, 2, 3);
- List<Integer> actualList = SourceFunctionUtil.runSourceFunction(
- CommonTestUtils.createCopySerializable(new FromElementsFunction<Integer>(
- IntSerializer.INSTANCE,
- Arrays.asList(1, 2, 3))));
- assertEquals(expectedList, actualList);
- }
-
- @Test
- public void generateSequenceTest() throws Exception {
- List<Long> expectedList = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L);
- List<Long> actualList = SourceFunctionUtil.runSourceFunction(new StatefulSequenceSource(1,
- 7));
- assertEquals(expectedList, actualList);
- }
-
- @Test
- public void socketTextStreamTest() throws Exception {
- // TODO: does not work because we cannot set the internal socket anymore
-// List<String> expectedList = Arrays.asList("a", "b", "c");
-// List<String> actualList = new ArrayList<String>();
-//
-// byte[] data = { 'a', '\n', 'b', '\n', 'c', '\n' };
-//
-// Socket socket = mock(Socket.class);
-// when(socket.getInputStream()).thenReturn(new ByteArrayInputStream(data));
-// when(socket.isClosed()).thenReturn(false);
-// when(socket.isConnected()).thenReturn(true);
-//
-// SocketTextStreamFunction source = new SocketTextStreamFunction("", 0, '\n', 0);
-// source.open(new Configuration());
-// while (!source.reachedEnd()) {
-// actualList.add(source.next());
-// }
-// assertEquals(expectedList, actualList);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
deleted file mode 100644
index 606259e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.util.NoOpSink;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.SplittableIterator;
-import org.junit.Test;
-
-public class StreamExecutionEnvironmentTest extends StreamingMultipleProgramsTestBase {
-
- @Test
- @SuppressWarnings("unchecked")
- public void testFromCollectionParallelism() {
- try {
- TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO;
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Integer> dataStream1 = env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo);
-
- try {
- dataStream1.setParallelism(4);
- fail("should throw an exception");
- }
- catch (IllegalArgumentException e) {
- // expected
- }
-
- dataStream1.addSink(new NoOpSink<Integer>());
-
- DataStreamSource<Integer> dataStream2 = env.fromParallelCollection(new DummySplittableIterator<Integer>(),
- typeInfo).setParallelism(4);
-
- dataStream2.addSink(new NoOpSink<Integer>());
-
- String plan = env.getExecutionPlan();
-
- assertEquals("Parallelism of collection source must be 1.", 1, env.getStreamGraph().getStreamNode(dataStream1.getId()).getParallelism());
- assertEquals("Parallelism of parallel collection source must be 4.",
- 4,
- env.getStreamGraph().getStreamNode(dataStream2.getId()).getParallelism());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testSources() {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void run(SourceContext<Integer> ctx) throws Exception {
- }
-
- @Override
- public void cancel() {
- }
- };
- DataStreamSource<Integer> src1 = env.addSource(srcFun);
- src1.addSink(new NoOpSink<Integer>());
- assertEquals(srcFun, getFunctionFromDataSource(src1));
-
- List<Long> list = Arrays.asList(0L, 1L, 2L);
-
- DataStreamSource<Long> src2 = env.generateSequence(0, 2);
- assertTrue(getFunctionFromDataSource(src2) instanceof StatefulSequenceSource);
-
- DataStreamSource<Long> src3 = env.fromElements(0L, 1L, 2L);
- assertTrue(getFunctionFromDataSource(src3) instanceof FromElementsFunction);
-
- DataStreamSource<Long> src4 = env.fromCollection(list);
- assertTrue(getFunctionFromDataSource(src4) instanceof FromElementsFunction);
- }
-
- /////////////////////////////////////////////////////////////
- // Utilities
- /////////////////////////////////////////////////////////////
-
-
- private static StreamOperator<?> getOperatorFromDataStream(DataStream<?> dataStream) {
- StreamExecutionEnvironment env = dataStream.getExecutionEnvironment();
- StreamGraph streamGraph = env.getStreamGraph();
- return streamGraph.getStreamNode(dataStream.getId()).getOperator();
- }
-
- private static <T> SourceFunction<T> getFunctionFromDataSource(DataStreamSource<T> dataStreamSource) {
- dataStreamSource.addSink(new NoOpSink<T>());
- AbstractUdfStreamOperator<?, ?> operator =
- (AbstractUdfStreamOperator<?, ?>) getOperatorFromDataStream(dataStreamSource);
- return (SourceFunction<T>) operator.getUserFunction();
- }
-
- public static class DummySplittableIterator<T> extends SplittableIterator<T> {
- private static final long serialVersionUID = 1312752876092210499L;
-
- @SuppressWarnings("unchecked")
- @Override
- public Iterator<T>[] split(int numPartitions) {
- return (Iterator<T>[]) new Iterator<?>[0];
- }
-
- @Override
- public int getMaximumNumberOfSplits() {
- return 0;
- }
-
- @Override
- public boolean hasNext() {
- return false;
- }
-
- @Override
- public T next() {
- throw new NoSuchElementException();
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
deleted file mode 100644
index 42febea..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase {
-
- private String resultPath1;
- private String resultPath2;
- private String expected1;
- private String expected2;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception {
- resultPath1 = tempFolder.newFile().toURI().toString();
- resultPath2 = tempFolder.newFile().toURI().toString();
- expected1 = "";
- expected2 = "";
- }
-
- @After
- public void after() throws Exception {
- compareResultsByLinesInMemory(expected1, resultPath1);
- compareResultsByLinesInMemory(expected2, resultPath2);
- }
-
- /**
- * Tests the proper functioning of the streaming fold operator. For this purpose, a stream
- * of Tuple2<Integer, Integer> is created. The stream is grouped according to the first tuple
- * value. Each group is folded where the second tuple value is summed up.
- *
- * @throws Exception
- */
- @Test
- public void testFoldOperation() throws Exception {
- int numElements = 10;
- int numKeys = 2;
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<Tuple2<Integer, Integer>> sourceStream = env.addSource(new TupleSource(numElements, numKeys));
-
- SplitStream<Tuple2<Integer, Integer>> splittedResult = sourceStream
- .keyBy(0)
- .fold(0, new FoldFunction<Tuple2<Integer, Integer>, Integer>() {
- @Override
- public Integer fold(Integer accumulator, Tuple2<Integer, Integer> value) throws Exception {
- return accumulator + value.f1;
- }
- }).map(new RichMapFunction<Integer, Tuple2<Integer, Integer>>() {
- @Override
- public Tuple2<Integer, Integer> map(Integer value) throws Exception {
- return new Tuple2<Integer, Integer>(getRuntimeContext().getIndexOfThisSubtask(), value);
- }
- }).split(new OutputSelector<Tuple2<Integer, Integer>>() {
- @Override
- public Iterable<String> select(Tuple2<Integer, Integer> value) {
- List<String> output = new ArrayList<>();
-
- output.add(value.f0 + "");
-
- return output;
- }
- });
-
- splittedResult.select("0").map(new MapFunction<Tuple2<Integer,Integer>, Integer>() {
- @Override
- public Integer map(Tuple2<Integer, Integer> value) throws Exception {
- return value.f1;
- }
- }).writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
-
- splittedResult.select("1").map(new MapFunction<Tuple2<Integer, Integer>, Integer>() {
- @Override
- public Integer map(Tuple2<Integer, Integer> value) throws Exception {
- return value.f1;
- }
- }).writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE);
-
- StringBuilder builder1 = new StringBuilder();
- StringBuilder builder2 = new StringBuilder();
- int counter1 = 0;
- int counter2 = 0;
-
- for (int i = 0; i < numElements; i++) {
- if (i % 2 == 0) {
- counter1 += i;
- builder1.append(counter1 + "\n");
- } else {
- counter2 += i;
- builder2.append(counter2 + "\n");
- }
- }
-
- expected1 = builder1.toString();
- expected2 = builder2.toString();
-
- env.execute();
- }
-
- /**
- * Tests whether the fold operation can also be called with non Java serializable types.
- */
- @Test
- public void testFoldOperationWithNonJavaSerializableType() throws Exception {
- final int numElements = 10;
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<Tuple2<Integer, NonSerializable>> input = env.addSource(new NonSerializableTupleSource(numElements));
-
- input
- .keyBy(0)
- .fold(
- new NonSerializable(42),
- new FoldFunction<Tuple2<Integer, NonSerializable>, NonSerializable>() {
- @Override
- public NonSerializable fold(NonSerializable accumulator, Tuple2<Integer, NonSerializable> value) throws Exception {
- return new NonSerializable(accumulator.value + value.f1.value);
- }
- })
- .map(new MapFunction<NonSerializable, Integer>() {
- @Override
- public Integer map(NonSerializable value) throws Exception {
- return value.value;
- }
- })
- .writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
-
- StringBuilder builder = new StringBuilder();
-
- for (int i = 0; i < numElements; i++) {
- builder.append(42 + i + "\n");
- }
-
- expected1 = builder.toString();
-
- env.execute();
- }
-
- private static class NonSerializable {
- // This makes the type non-serializable
- private final Object obj = new Object();
-
- private final int value;
-
- public NonSerializable(int value) {
- this.value = value;
- }
- }
-
- private static class NonSerializableTupleSource implements SourceFunction<Tuple2<Integer, NonSerializable>> {
- private final int numElements;
-
- public NonSerializableTupleSource(int numElements) {
- this.numElements = numElements;
- }
-
-
- @Override
- public void run(SourceContext<Tuple2<Integer, NonSerializable>> ctx) throws Exception {
- for (int i = 0; i < numElements; i++) {
- ctx.collect(new Tuple2<Integer, NonSerializable>(i, new NonSerializable(i)));
- }
- }
-
- @Override
- public void cancel() {}
- }
-
- private static class TupleSource implements SourceFunction<Tuple2<Integer, Integer>> {
-
- private final int numElements;
- private final int numKeys;
-
- public TupleSource(int numElements, int numKeys) {
- this.numElements = numElements;
- this.numKeys = numKeys;
- }
-
- @Override
- public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
- for (int i = 0; i < numElements; i++) {
- Tuple2<Integer, Integer> result = new Tuple2<>(i % numKeys, i);
- ctx.collect(result);
- }
- }
-
- @Override
- public void cancel() {
-
- }
- }
-}