You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:43 UTC
[27/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
deleted file mode 100644
index 4c0f59f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class TypeFillTest extends StreamingMultipleProgramsTestBase {
-
- @Test
- public void test() {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- try {
- env.addSource(new TestSource<Integer>()).print();
- fail();
- } catch (Exception e) {
- }
-
- DataStream<Long> source = env.generateSequence(1, 10);
-
- try {
- source.map(new TestMap<Long, Long>()).print();
- fail();
- } catch (Exception e) {
- }
- try {
- source.flatMap(new TestFlatMap<Long, Long>()).print();
- fail();
- } catch (Exception e) {
- }
- try {
- source.connect(source).map(new TestCoMap<Long, Long, Integer>()).print();
- fail();
- } catch (Exception e) {
- }
- try {
- source.connect(source).flatMap(new TestCoFlatMap<Long, Long, Integer>()).print();
- fail();
- } catch (Exception e) {
- }
-
- env.addSource(new TestSource<Integer>()).returns("Integer");
- source.map(new TestMap<Long, Long>()).returns(Long.class).print();
- source.flatMap(new TestFlatMap<Long, Long>()).returns("Long").print();
- source.connect(source).map(new TestCoMap<Long, Long, Integer>()).returns("Integer").print();
- source.connect(source).flatMap(new TestCoFlatMap<Long, Long, Integer>())
- .returns(BasicTypeInfo.INT_TYPE_INFO).print();
-
- assertEquals(BasicTypeInfo.LONG_TYPE_INFO,
- source.map(new TestMap<Long, Long>()).returns(Long.class).getType());
-
- SingleOutputStreamOperator<String, ?> map = source.map(new MapFunction<Long, String>() {
-
- @Override
- public String map(Long value) throws Exception {
- return null;
- }
- });
-
- map.print();
- try {
- map.returns("String");
- fail();
- } catch (Exception e) {
- }
-
- }
-
- private class TestSource<T> implements SourceFunction<T> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void run(SourceContext<T> ctx) throws Exception {
-
- }
-
- @Override
- public void cancel() {
-
- }
- }
-
- private class TestMap<T, O> implements MapFunction<T, O> {
- @Override
- public O map(T value) throws Exception {
- return null;
- }
- }
-
- private class TestFlatMap<T, O> implements FlatMapFunction<T, O> {
- @Override
- public void flatMap(T value, Collector<O> out) throws Exception {
- }
- }
-
- private class TestCoMap<IN1, IN2, OUT> implements CoMapFunction<IN1, IN2, OUT> {
-
- @Override
- public OUT map1(IN1 value) {
- return null;
- }
-
- @Override
- public OUT map2(IN2 value) {
- return null;
- }
-
- }
-
- private class TestCoFlatMap<IN1, IN2, OUT> implements CoFlatMapFunction<IN1, IN2, OUT> {
-
- @Override
- public void flatMap1(IN1 value, Collector<OUT> out) throws Exception {
- }
-
- @Override
- public void flatMap2(IN2 value, Collector<OUT> out) throws Exception {
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
deleted file mode 100644
index d2e24c9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestListResultSink;
-import org.junit.Test;
-
-public class DirectedOutputTest extends StreamingMultipleProgramsTestBase {
-
- private static final String TEN = "ten";
- private static final String ODD = "odd";
- private static final String EVEN = "even";
- private static final String NON_SELECTED = "nonSelected";
-
- static final class MyOutputSelector implements OutputSelector<Long> {
- private static final long serialVersionUID = 1L;
-
- List<String> outputs = new ArrayList<String>();
-
- @Override
- public Iterable<String> select(Long value) {
- outputs.clear();
- if (value % 2 == 0) {
- outputs.add(EVEN);
- } else {
- outputs.add(ODD);
- }
-
- if (value == 10L) {
- outputs.add(TEN);
- }
-
- if (value == 11L) {
- outputs.add(NON_SELECTED);
- }
- return outputs;
- }
- }
-
- static final class ListSink implements SinkFunction<Long> {
- private static final long serialVersionUID = 1L;
-
- private String name;
- private transient List<Long> list;
-
- public ListSink(String name) {
- this.name = name;
- }
-
- @Override
- public void invoke(Long value) {
- list.add(value);
- }
-
- private void readObject(java.io.ObjectInputStream in) throws IOException,
- ClassNotFoundException {
- in.defaultReadObject();
- outputs.put(name, new ArrayList<Long>());
- this.list = outputs.get(name);
- }
-
- }
-
- private static Map<String, List<Long>> outputs = new HashMap<String, List<Long>>();
-
- @Test
- public void outputSelectorTest() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(3);
-
- TestListResultSink<Long> evenSink = new TestListResultSink<Long>();
- TestListResultSink<Long> oddAndTenSink = new TestListResultSink<Long>();
- TestListResultSink<Long> evenAndOddSink = new TestListResultSink<Long>();
- TestListResultSink<Long> allSink = new TestListResultSink<Long>();
-
- SplitStream<Long> source = env.generateSequence(1, 11).split(new MyOutputSelector());
- source.select(EVEN).addSink(evenSink);
- source.select(ODD, TEN).addSink(oddAndTenSink);
- source.select(EVEN, ODD).addSink(evenAndOddSink);
- source.addSink(allSink);
-
- env.execute();
- assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), evenSink.getSortedResult());
- assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), oddAndTenSink.getSortedResult());
- assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L),
- evenAndOddSink.getSortedResult());
- assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L),
- allSink.getSortedResult());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
deleted file mode 100644
index a3d89f2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.junit.Test;
-
-public class OutputSelectorTest {
-
- static final class MyOutputSelector implements OutputSelector<Tuple1<Integer>> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Iterable<String> select(Tuple1<Integer> tuple) {
-
- String[] outputs = new String[tuple.f0];
-
- for (Integer i = 0; i < tuple.f0; i++) {
- outputs[i] = i.toString();
- }
- return Arrays.asList(outputs);
- }
- }
-
- @Test
- public void testGetOutputs() {
- OutputSelector<Tuple1<Integer>> selector = new MyOutputSelector();
- List<String> expectedOutputs = new ArrayList<String>();
- expectedOutputs.add("0");
- expectedOutputs.add("1");
- assertEquals(expectedOutputs, selector.select(new Tuple1<Integer>(2)));
- expectedOutputs.add("2");
- assertEquals(expectedOutputs, selector.select(new Tuple1<Integer>(3)));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
deleted file mode 100644
index 020dda3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ /dev/null
@@ -1,837 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.complex;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.IterativeStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.TimestampExtractor;
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
-import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-@SuppressWarnings("serial")
-public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
-
- // *************************************************************************
- // GENERAL SETUP
- // *************************************************************************
-
- private String resultPath1;
- private String resultPath2;
- private String expected1;
- private String expected2;
-
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception {
- resultPath1 = tempFolder.newFile().toURI().toString();
- resultPath2 = tempFolder.newFile().toURI().toString();
- expected1 = "";
- expected2 = "";
- }
-
- @After
- public void after() throws Exception {
- compareResultsByLinesInMemory(expected1, resultPath1);
- compareResultsByLinesInMemory(expected2, resultPath2);
- }
-
- // *************************************************************************
- // INTEGRATION TESTS
- // *************************************************************************
-
- @Test
- public void complexIntegrationTest1() throws Exception {
- //Testing data stream splitting with tuples
-
- expected1 = "";
- for (int i = 0; i < 8; i++) {
- expected1 += "(10,(a,1))\n";
- }
- //i == 8
- expected1 += "(10,(a,1))";
-
- expected2 = "";
- for (int i = 0; i < 18; i++) {
- expected2 += "(20,(a,1))\n";
- }
- //i == 18
- expected2 += "(20,(a,1))";
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<Tuple2<Long, Tuple2<String, Long>>> sourceStream1 = env.addSource(new TupleSource()).setParallelism(1);
-
- IterativeStream<Tuple2<Long, Tuple2<String, Long>>> it = sourceStream1.map(new MapFunction<Tuple2<Long, Tuple2<String, Long>>,Tuple2<Long, Tuple2<String, Long>>>(){
-
- Tuple2<Long, Tuple2<String, Long>> result = new Tuple2<>(
- 0L, new Tuple2<>("", 0L));
-
- @Override
- public Tuple2<Long, Tuple2<String, Long>> map(
- Tuple2<Long, Tuple2<String, Long>> value) throws Exception {
- result.f0 = result.f0 + value.f0;
- result.f1 = value.f1;
- return result;
- }
-
- })
- .setParallelism(1).filter(new FilterFunction
- <Tuple2<Long, Tuple2<String, Long>>>() {
-
- @Override
- public boolean filter(Tuple2<Long, Tuple2<String, Long>> value) throws Exception {
- return value.f0 < 20;
- }
- }).iterate(5000);
-
- SplitStream<Tuple2<Long, Tuple2<String, Long>>> step = it.map(new IncrementMap()).split(new
- MyOutputSelector());
- it.closeWith(step.select("iterate"));
-
- step.select("firstOutput")
- .writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
- step.select("secondOutput")
- .writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
- }
-
- // Disabled, because it depends on strange behaviour, for example of the sum() function.
- // This test evens fails, for example, if the order of only two lines in the "input" is changed.
- @SuppressWarnings("unchecked")
- @Ignore
- @Test
- public void complexIntegrationTest2() throws Exception {
- //Testing POJO source, grouping by multiple filds and windowing with timestamp
-
- expected1 = "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" +
- "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" +
- "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" +
- "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" +
- "water_melon-b\n" + "orange-b\n" + "orange-b\n" + "orange-b\n" + "orange-b\n" + "orange-b\n" +
- "orange-b\n" + "orange-c\n" + "orange-c\n" + "orange-c\n" + "orange-c\n" + "orange-d\n" + "orange-d\n" +
- "peach-d\n" + "peach-d\n";
-
- List<Tuple5<Integer, String, Character, Double, Boolean>> input = Arrays.asList(
- new Tuple5<>(1, "apple", 'j', 0.1, false),
- new Tuple5<>(1, "peach", 'b', 0.8, false),
- new Tuple5<>(1, "orange", 'c', 0.7, true),
- new Tuple5<>(2, "apple", 'd', 0.5, false),
- new Tuple5<>(2, "peach", 'j', 0.6, false),
- new Tuple5<>(3, "orange", 'b', 0.2, true),
- new Tuple5<>(6, "apple", 'c', 0.1, false),
- new Tuple5<>(7, "peach", 'd', 0.4, false),
- new Tuple5<>(8, "orange", 'j', 0.2, true),
- new Tuple5<>(10, "apple", 'b', 0.1, false),
- new Tuple5<>(10, "peach", 'c', 0.5, false),
- new Tuple5<>(11, "orange", 'd', 0.3, true),
- new Tuple5<>(11, "apple", 'j', 0.3, false),
- new Tuple5<>(12, "peach", 'b', 0.9, false),
- new Tuple5<>(13, "orange", 'c', 0.7, true),
- new Tuple5<>(15, "apple", 'd', 0.2, false),
- new Tuple5<>(16, "peach", 'j', 0.8, false),
- new Tuple5<>(16, "orange", 'b', 0.8, true),
- new Tuple5<>(16, "apple", 'c', 0.1, false),
- new Tuple5<>(17, "peach", 'd', 1.0, true));
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().enableTimestamps();
-
- SingleOutputStreamOperator<Tuple5<Integer, String, Character, Double, Boolean>, DataStreamSource<Tuple5<Integer, String, Character, Double, Boolean>>> sourceStream21 = env.fromCollection(input);
- DataStream<OuterPojo> sourceStream22 = env.addSource(new PojoSource());
-
- sourceStream21
- .assignTimestamps(new MyTimestampExtractor())
- .keyBy(2, 2)
- .timeWindow(Time.of(10, TimeUnit.MILLISECONDS), Time.of(4, TimeUnit.MILLISECONDS))
- .maxBy(3)
- .map(new MyMapFunction2())
- .flatMap(new MyFlatMapFunction())
- .connect(sourceStream22)
- .map(new MyCoMapFunction())
- .writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void complexIntegrationTest3() throws Exception {
- //Heavy prime factorisation with maps and flatmaps
-
- expected1 = "541\n" + "1223\n" + "3319\n" + "5851\n" + "1987\n" + "8387\n" + "15907\n" + "10939\n" +
- "4127\n" + "2477\n" + "6737\n" + "13421\n" + "4987\n" + "4999\n" + "18451\n" + "9283\n" + "7499\n" +
- "16937\n" + "11927\n" + "9973\n" + "14431\n" + "19507\n" + "12497\n" + "17497\n" + "14983\n" +
- "19997\n";
-
- expected1 = "541\n" + "1223\n" + "1987\n" + "2741\n" + "3571\n" + "10939\n" + "4409\n" +
- "5279\n" + "11927\n" + "6133\n" + "6997\n" + "12823\n" + "7919\n" + "8831\n" +
- "13763\n" + "9733\n" + "9973\n" + "14759\n" + "15671\n" + "16673\n" + "17659\n" +
- "18617\n" + "19697\n" + "19997\n";
-
- for (int i = 2; i < 100; i++) {
- expected2 += "(" + i + "," + 20000 / i + ")\n";
- }
- for (int i = 19901; i < 20000; i++) {
- expected2 += "(" + i + "," + 20000 / i + ")\n";
- }
- //i == 20000
- expected2 += "(" + 20000 + "," + 1 + ")";
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // set to parallelism 1 because otherwise we don't know which elements go to which parallel
- // count-window.
- env.setParallelism(1);
-
- env.setBufferTimeout(0);
-
- DataStream<Long> sourceStream31 = env.generateSequence(1, 10000);
- DataStream<Long> sourceStream32 = env.generateSequence(10001, 20000);
-
- sourceStream31.filter(new PrimeFilterFunction())
- .windowAll(GlobalWindows.create())
- .trigger(PurgingTrigger.of(CountTrigger.of(100)))
- .max(0)
- .union(sourceStream32.filter(new PrimeFilterFunction())
- .windowAll(GlobalWindows.create())
- .trigger(PurgingTrigger.of(CountTrigger.of(100)))
- .max(0))
- .writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
-
- sourceStream31.flatMap(new DivisorsFlatMapFunction())
- .union(sourceStream32.flatMap(new DivisorsFlatMapFunction())).map(new MapFunction<Long, Tuple2<Long,
- Integer>>() {
-
- @Override
- public Tuple2<Long, Integer> map(Long value) throws Exception {
- return new Tuple2<>(value, 1);
- }
- })
- .keyBy(0)
- .window(GlobalWindows.create())
- .trigger(PurgingTrigger.of(CountTrigger.of(10_000)))
- .sum(1)
- .filter(new FilterFunction<Tuple2<Long, Integer>>() {
-
- @Override
- public boolean filter(Tuple2<Long, Integer> value) throws Exception {
- return value.f0 < 100 || value.f0 > 19900;
- }
- })
- .writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
- }
-
- @Test
- @Ignore
- public void complexIntegrationTest4() throws Exception {
- //Testing mapping and delta-policy windowing with custom class
-
- expected1 = "((100,100),0)\n" + "((120,122),5)\n" + "((121,125),6)\n" + "((138,144),9)\n" +
- "((139,147),10)\n" + "((156,166),13)\n" + "((157,169),14)\n" + "((174,188),17)\n" + "((175,191),18)\n" +
- "((192,210),21)\n" + "((193,213),22)\n" + "((210,232),25)\n" + "((211,235),26)\n" + "((228,254),29)\n" +
- "((229,257),30)\n" + "((246,276),33)\n" + "((247,279),34)\n" + "((264,298),37)\n" + "((265,301),38)\n" +
- "((282,320),41)\n" + "((283,323),42)\n" + "((300,342),45)\n" + "((301,345),46)\n" + "((318,364),49)\n" +
- "((319,367),50)\n" + "((336,386),53)\n" + "((337,389),54)\n" + "((354,408),57)\n" + "((355,411),58)\n" +
- "((372,430),61)\n" + "((373,433),62)\n" + "((390,452),65)\n" + "((391,455),66)\n" + "((408,474),69)\n" +
- "((409,477),70)\n" + "((426,496),73)\n" + "((427,499),74)\n" + "((444,518),77)\n" + "((445,521),78)\n" +
- "((462,540),81)\n" + "((463,543),82)\n" + "((480,562),85)\n" + "((481,565),86)\n" + "((498,584),89)\n" +
- "((499,587),90)\n" + "((516,606),93)\n" + "((517,609),94)\n" + "((534,628),97)\n" + "((535,631),98)";
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- env.addSource(new RectangleSource())
- .global()
- .map(new RectangleMapFunction())
- .windowAll(GlobalWindows.create())
- .trigger(PurgingTrigger.of(DeltaTrigger.of(0.0, new MyDelta())))
- .apply(new MyWindowMapFunction())
- .writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
- }
-
- private static class MyDelta implements DeltaFunction<Tuple2<Rectangle, Integer>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public double getDelta(Tuple2<Rectangle, Integer> oldDataPoint, Tuple2<Rectangle,
- Integer> newDataPoint) {
- return (newDataPoint.f0.b - newDataPoint.f0.a) - (oldDataPoint.f0.b - oldDataPoint.f0.a);
- }
- }
-
-
- @Test
- public void complexIntegrationTest5() throws Exception {
- //Turning on and off chaining
-
- expected1 = "1\n" + "2\n" + "2\n" + "3\n" + "3\n" + "3\n" + "4\n" + "4\n" + "4\n" + "4\n" + "5\n" + "5\n" +
- "5\n" + "5\n" + "5\n";
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // Set to parallelism 1 to make it deterministic, otherwise, it is not clear which
- // elements will go to which parallel instance of the fold
- env.setParallelism(1);
-
- env.setBufferTimeout(0);
-
- DataStream<Long> dataStream51 = env.generateSequence(1, 5)
- .map(new MapFunction<Long, Long>() {
-
- @Override
- public Long map(Long value) throws Exception {
- return value;
- }
- }).startNewChain()
- .filter(new FilterFunction<Long>() {
-
- @Override
- public boolean filter(Long value) throws Exception {
- return true;
- }
- }).disableChaining()
- .flatMap(new SquareFlatMapFunction());
-
- DataStream<Long> dataStream53 = dataStream51.map(new MapFunction<Long, Long>() {
-
- @Override
- public Long map(Long value) throws Exception {
- return value;
- }
- });
-
-
- dataStream53.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
- }
-
-
- @Test
- @Ignore
- public void complexIntegrationTest6() throws Exception {
- //Testing java collections and date-time types
-
- expected1 = "(6,(a,6))\n" + "(6,(b,3))\n" + "(6,(c,4))\n" + "(6,(d,2))\n" + "(6,(f,2))\n" +
- "(7,(a,1))\n" + "(7,(b,2))\n" + "(7,(c,3))\n" + "(7,(d,1))\n" + "(7,(e,1))\n" + "(7,(f,1))\n" +
- "(8,(a,6))\n" + "(8,(b,4))\n" + "(8,(c,5))\n" + "(8,(d,1))\n" + "(8,(e,2))\n" + "(8,(f,2))\n" +
- "(9,(a,4))\n" + "(9,(b,4))\n" + "(9,(c,7))\n" + "(9,(d,3))\n" + "(9,(e,1))\n" + "(9,(f,2))\n" +
- "(10,(a,3))\n" + "(10,(b,2))\n" + "(10,(c,3))\n" + "(10,(d,2))\n" + "(10,(e,1))\n" + "(10,(f,1))";
- expected2 = "[a, a, c, c, d, f]\n" + "[a, b, b, d]\n" + "[a, a, a, b, c, c, f]\n" + "[a, d, e]\n" +
- "[b, b, c, c, c, f]\n" + "[a, a, a, a, b, b, c, c, e]\n" + "[a, a, b, b, c, c, c, d, e, f, f]\n" +
- "[a, a, a, b, c, c, c, d, d, f]\n" + "[a, b, b, b, c, c, c, c, d, e, f]\n" +
- "[a, a, a, b, b, c, c, c, d, d, e, f]";
-
- SimpleDateFormat ft = new SimpleDateFormat("dd-MM-yyyy");
-
- ArrayList<Tuple2<Date, HashMap<Character, Integer>>> sales = new ArrayList<>();
- HashMap<Character, Integer> sale1 = new HashMap<>();
- sale1.put('a', 2);
- sale1.put('c', 2);
- sale1.put('d', 1);
- sale1.put('f', 1);
- sales.add(new Tuple2<>(ft.parse("03-06-2014"), sale1));
-
- HashMap<Character, Integer> sale2 = new HashMap<>();
- sale2.put('a', 1);
- sale2.put('b', 2);
- sale2.put('d', 1);
- sales.add(new Tuple2<>(ft.parse("10-06-2014"), sale2));
-
- HashMap<Character, Integer> sale3 = new HashMap<>();
- sale3.put('a', 3);
- sale3.put('b', 1);
- sale3.put('c', 2);
- sale3.put('f', 1);
- sales.add(new Tuple2<>(ft.parse("29-06-2014"), sale3));
-
- HashMap<Character, Integer> sale4 = new HashMap<>();
- sale4.put('a', 1);
- sale4.put('d', 1);
- sale4.put('e', 1);
- sales.add(new Tuple2<>(ft.parse("15-07-2014"), sale4));
-
- HashMap<Character, Integer> sale5 = new HashMap<>();
- sale5.put('b', 2);
- sale5.put('c', 3);
- sale5.put('f', 1);
- sales.add(new Tuple2<>(ft.parse("24-07-2014"), sale5));
-
- HashMap<Character, Integer> sale6 = new HashMap<>();
- sale6.put('a', 4);
- sale6.put('b', 2);
- sale6.put('c', 2);
- sale6.put('e', 1);
- sales.add(new Tuple2<>(ft.parse("17-08-2014"), sale6));
-
- HashMap<Character, Integer> sale7 = new HashMap<>();
- sale7.put('a', 2);
- sale7.put('b', 2);
- sale7.put('c', 3);
- sale7.put('d', 1);
- sale7.put('e', 1);
- sale7.put('f', 2);
- sales.add(new Tuple2<>(ft.parse("27-08-2014"), sale7));
-
- HashMap<Character, Integer> sale8 = new HashMap<>();
- sale8.put('a', 3);
- sale8.put('b', 1);
- sale8.put('c', 3);
- sale8.put('d', 2);
- sale8.put('f', 1);
- sales.add(new Tuple2<>(ft.parse("16-09-2014"), sale8));
-
- HashMap<Character, Integer> sale9 = new HashMap<>();
- sale9.put('a', 1);
- sale9.put('b', 3);
- sale9.put('c', 4);
- sale9.put('d', 1);
- sale9.put('e', 1);
- sale9.put('f', 1);
- sales.add(new Tuple2<>(ft.parse("25-09-2014"), sale9));
-
- HashMap<Character, Integer> sale10 = new HashMap<>();
- sale10.put('a', 3);
- sale10.put('b', 2);
- sale10.put('c', 3);
- sale10.put('d', 2);
- sale10.put('e', 1);
- sale10.put('f', 1);
- sales.add(new Tuple2<>(ft.parse("01-10-2014"), sale10));
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().enableTimestamps();
-
- DataStream<Tuple2<Date, HashMap<Character, Integer>>> sourceStream6 = env.fromCollection(sales);
- sourceStream6
- .assignTimestamps(new Timestamp6())
- .timeWindowAll(Time.of(1, TimeUnit.MILLISECONDS))
- .reduce(new SalesReduceFunction())
- .flatMap(new FlatMapFunction6())
- .writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
-
- sourceStream6.map(new MapFunction6())
- .writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE);
-
- env.execute();
-
- }
-
- // *************************************************************************
- // FUNCTIONS
- // *************************************************************************
-
- private static class MyMapFunction2 implements MapFunction<Tuple5<Integer, String, Character, Double, Boolean>, Tuple4<Integer, String,
- Double, Boolean>> {
-
- @Override
- public Tuple4<Integer, String, Double, Boolean> map(Tuple5<Integer, String, Character, Double,
- Boolean> value) throws Exception {
- return new Tuple4<>(value.f0, value.f1 + "-" + value.f2,
- value.f3, value.f4);
- }
-
- }
-
- private static class PojoSource implements SourceFunction<OuterPojo> {
- private static final long serialVersionUID = 1L;
-
- long cnt = 0;
-
- @Override
- public void run(SourceContext<OuterPojo> ctx) throws Exception {
- for (int i = 0; i < 20; i++) {
- OuterPojo result = new OuterPojo(new InnerPojo(cnt / 2, "water_melon-b"), 2L);
- ctx.collect(result);
- }
- }
-
- @Override
- public void cancel() {
-
- }
- }
-
- private static class TupleSource implements SourceFunction<Tuple2<Long, Tuple2<String, Long>>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void run(SourceContext<Tuple2<Long, Tuple2<String, Long>>> ctx) throws Exception {
- for (int i = 0; i < 20; i++) {
- Tuple2<Long, Tuple2<String, Long>> result = new Tuple2<>(1L, new Tuple2<>("a", 1L));
- ctx.collect(result);
- }
- }
-
- @Override
- public void cancel() {
-
- }
- }
-
- private class IncrementMap implements MapFunction<Tuple2<Long, Tuple2<String, Long>>, Tuple2<Long, Tuple2<String,
- Long>>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<Long, Tuple2<String, Long>> map(Tuple2<Long, Tuple2<String, Long>> value) throws Exception {
- return new Tuple2<>(value.f0 + 1, value.f1);
- }
- }
-
- private static class MyTimestampExtractor implements TimestampExtractor<Tuple5<Integer, String, Character, Double, Boolean>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public long extractTimestamp(Tuple5<Integer, String, Character, Double, Boolean> value, long currentTimestamp) {
- return (long) value.f0;
- }
-
- @Override
- public long extractWatermark(Tuple5<Integer, String, Character, Double, Boolean> value,
- long currentTimestamp) {
- return (long) value.f0 - 1;
- }
-
- @Override
- public long getCurrentWatermark() {
- return Long.MIN_VALUE;
- }
- }
-
- private static class MyFlatMapFunction implements FlatMapFunction<Tuple4<Integer, String, Double,
- Boolean>, OuterPojo> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap(Tuple4<Integer, String, Double, Boolean> value, Collector<OuterPojo> out) throws
- Exception {
- if (value.f3) {
- for (int i = 0; i < 2; i++) {
- out.collect(new OuterPojo(new InnerPojo((long) value.f0, value.f1), (long) i));
- }
- }
- }
- }
-
- private class MyCoMapFunction implements CoMapFunction<OuterPojo, OuterPojo, String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map1(OuterPojo value) {
- return value.f0.f1;
- }
-
- @Override
- public String map2(OuterPojo value) {
- return value.f0.f1;
- }
- }
-
- private class MyOutputSelector implements OutputSelector<Tuple2<Long, Tuple2<String, Long>>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Iterable<String> select(Tuple2<Long, Tuple2<String, Long>> value) {
- List<String> output = new ArrayList<>();
- if (value.f0 == 10) {
- output.add("iterate");
- output.add("firstOutput");
- } else if (value.f0 == 20) {
- output.add("secondOutput");
- } else {
- output.add("iterate");
- }
- return output;
- }
- }
-
- private static class PrimeFilterFunction implements FilterFunction<Long> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean filter(Long value) throws Exception {
- if (value < 2) {
- return false;
- } else {
- for (long i = 2; i < value; i++) {
- if (value % i == 0) {
- return false;
- }
- }
- }
- return true;
- }
- }
-
- private static class DivisorsFlatMapFunction implements FlatMapFunction<Long, Long> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap(Long value, Collector<Long> out) throws Exception {
- for (long i = 2; i <= value; i++) {
- if (value % i == 0) {
- out.collect(i);
- }
- }
- }
- }
-
- private static class RectangleSource extends RichSourceFunction<Rectangle> {
- private static final long serialVersionUID = 1L;
- private transient Rectangle rectangle;
-
- public void open(Configuration parameters) throws Exception {
- rectangle = new Rectangle(100, 100);
- }
-
- @Override
- public void run(SourceContext<Rectangle> ctx) throws Exception {
- // emit once as the initializer of the delta trigger
- ctx.collect(rectangle);
- for (int i = 0; i < 100; i++) {
- ctx.collect(rectangle);
- rectangle = rectangle.next();
- }
- }
-
- @Override
- public void cancel() {
- }
- }
-
- private static class RectangleMapFunction implements MapFunction<Rectangle, Tuple2<Rectangle, Integer>> {
- private static final long serialVersionUID = 1L;
- private int counter = 0;
-
- @Override
- public Tuple2<Rectangle, Integer> map(Rectangle value) throws Exception {
- return new Tuple2<>(value, counter++);
- }
- }
-
- private static class MyWindowMapFunction implements AllWindowFunction<Tuple2<Rectangle, Integer>, Tuple2<Rectangle, Integer>, GlobalWindow> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void apply(GlobalWindow window, Iterable<Tuple2<Rectangle, Integer>> values, Collector<Tuple2<Rectangle,
- Integer>> out) throws Exception {
- out.collect(values.iterator().next());
- }
- }
-
- private static class SquareFlatMapFunction implements FlatMapFunction<Long, Long> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap(Long value, Collector<Long> out) throws Exception {
- for (long i = 0; i < value; i++) {
- out.collect(value);
- }
- }
- }
-
- private static class Timestamp6 implements TimestampExtractor<Tuple2<Date, HashMap<Character, Integer>>> {
-
- @Override
- public long extractTimestamp(Tuple2<Date, HashMap<Character, Integer>> value,
- long currentTimestamp) {
- Calendar cal = Calendar.getInstance();
- cal.setTime(value.f0);
- return 12 * (cal.get(Calendar.YEAR)) + cal.get(Calendar.MONTH);
- }
-
- @Override
- public long extractWatermark(Tuple2<Date, HashMap<Character, Integer>> value,
- long currentTimestamp) {
- Calendar cal = Calendar.getInstance();
- cal.setTime(value.f0);
- return 12 * (cal.get(Calendar.YEAR)) + cal.get(Calendar.MONTH) - 1;
- }
-
- @Override
- public long getCurrentWatermark() {
- return 0;
- }
- }
-
- private static class SalesReduceFunction implements ReduceFunction<Tuple2<Date, HashMap<Character, Integer>>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<Date, HashMap<Character, Integer>> reduce(Tuple2<Date, HashMap<Character, Integer>> value1,
- Tuple2<Date,
- HashMap<Character, Integer>> value2) throws Exception {
- HashMap<Character, Integer> map1 = value1.f1;
- HashMap<Character, Integer> map2 = value2.f1;
- for (Character key : map2.keySet()) {
- Integer volume1 = map1.get(key);
- Integer volume2 = map2.get(key);
- if (volume1 == null) {
- volume1 = 0;
- }
- map1.put(key, volume1 + volume2);
- }
- return new Tuple2<>(value2.f0, map1);
- }
- }
-
- private static class FlatMapFunction6 implements FlatMapFunction<Tuple2<Date, HashMap<Character, Integer>>, Tuple2<Integer,
- Tuple2<Character, Integer>>> {
-
- @Override
- public void flatMap(Tuple2<Date, HashMap<Character, Integer>> value, Collector<Tuple2<Integer,
- Tuple2<Character, Integer>>> out) throws Exception {
- Calendar cal = Calendar.getInstance();
- cal.setTime(value.f0);
- for (Character key : value.f1.keySet()) {
- out.collect(new Tuple2<>(cal.get(Calendar.MONTH)
- + 1,
- new Tuple2<>(key, value.f1.get(key))));
- }
- }
- }
-
- private static class MapFunction6 implements MapFunction<Tuple2<Date, HashMap<Character, Integer>>, ArrayList<Character>> {
-
- @Override
- public ArrayList<Character> map(Tuple2<Date, HashMap<Character, Integer>> value)
- throws Exception {
- ArrayList<Character> list = new ArrayList<>();
- for (Character ch : value.f1.keySet()) {
- for (int i = 0; i < value.f1.get(ch); i++) {
- list.add(ch);
- }
- }
- Collections.sort(list);
- return list;
- }
- }
-
- // *************************************************************************
- // DATA TYPES
- // *************************************************************************
-
- //Flink Pojo
- public static class InnerPojo {
- public Long f0;
- public String f1;
-
- //default constructor to qualify as Flink POJO
- InnerPojo(){}
-
- public InnerPojo(Long f0, String f1) {
- this.f0 = f0;
- this.f1 = f1;
- }
-
- @Override
- public String toString() {
- return "POJO(" + f0 + "," + f1 + ")";
- }
- }
-
- // Nested class serialized with Kryo
- public static class OuterPojo {
- public InnerPojo f0;
- public Long f1;
-
- public OuterPojo(InnerPojo f0, Long f1) {
- this.f0 = f0;
- this.f1 = f1;
- }
-
- @Override
- public String toString() {
- return "POJO(" + f0 + "," + f1 + ")";
- }
- }
-
- public static class Rectangle {
-
- public int a;
- public int b;
-
- //default constructor to qualify as Flink POJO
- public Rectangle() {}
-
- public Rectangle(int a, int b) {
- this.a = a;
- this.b = b;
- }
-
- public Rectangle next() {
- return new Rectangle(a + (b % 11), b + (a % 9));
- }
-
- @Override
- public String toString() {
- return "(" + a + "," + b + ")";
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
deleted file mode 100644
index 41bd381..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.ValueTypeInfo;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.ExceptionUtils;
-
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests for the {@link org.apache.flink.streaming.api.functions.source.FromElementsFunction}.
- */
-public class FromElementsFunctionTest {
-
- @Test
- public void testStrings() {
- try {
- String[] data = { "Oh", "boy", "what", "a", "show", "!"};
-
- FromElementsFunction<String> source = new FromElementsFunction<String>(
- BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), data);
-
- List<String> result = new ArrayList<String>();
- source.run(new ListSourceContext<String>(result));
-
- assertEquals(Arrays.asList(data), result);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testNonJavaSerializableType() {
- try {
- MyPojo[] data = { new MyPojo(1, 2), new MyPojo(3, 4), new MyPojo(5, 6) };
-
- FromElementsFunction<MyPojo> source = new FromElementsFunction<MyPojo>(
- TypeExtractor.getForClass(MyPojo.class).createSerializer(new ExecutionConfig()), data);
-
- List<MyPojo> result = new ArrayList<MyPojo>();
- source.run(new ListSourceContext<MyPojo>(result));
-
- assertEquals(Arrays.asList(data), result);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testSerializationError() {
- try {
- TypeInformation<SerializationErrorType> info =
- new ValueTypeInfo<SerializationErrorType>(SerializationErrorType.class);
-
- try {
- new FromElementsFunction<SerializationErrorType>(
- info.createSerializer(new ExecutionConfig()), new SerializationErrorType());
-
- fail("should fail with an exception");
- }
- catch (IOException e) {
- assertTrue(ExceptionUtils.stringifyException(e).contains("test exception"));
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testDeSerializationError() {
- try {
- TypeInformation<DeserializeTooMuchType> info =
- new ValueTypeInfo<DeserializeTooMuchType>(DeserializeTooMuchType.class);
-
- FromElementsFunction<DeserializeTooMuchType> source = new FromElementsFunction<DeserializeTooMuchType>(
- info.createSerializer(new ExecutionConfig()), new DeserializeTooMuchType());
-
- try {
- source.run(new ListSourceContext<DeserializeTooMuchType>(new ArrayList<DeserializeTooMuchType>()));
- fail("should fail with an exception");
- }
- catch (IOException e) {
- assertTrue(ExceptionUtils.stringifyException(e).contains("user-defined serialization"));
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testCheckpointAndRestore() {
- try {
- final int NUM_ELEMENTS = 10000;
-
- List<Integer> data = new ArrayList<Integer>(NUM_ELEMENTS);
- List<Integer> result = new ArrayList<Integer>(NUM_ELEMENTS);
-
- for (int i = 0; i < NUM_ELEMENTS; i++) {
- data.add(i);
- }
-
- final FromElementsFunction<Integer> source = new FromElementsFunction<Integer>(IntSerializer.INSTANCE, data);
- final FromElementsFunction<Integer> sourceCopy = CommonTestUtils.createCopySerializable(source);
-
- final SourceFunction.SourceContext<Integer> ctx = new ListSourceContext<Integer>(result, 2L);
-
- final Throwable[] error = new Throwable[1];
-
- // run the source asynchronously
- Thread runner = new Thread() {
- @Override
- public void run() {
- try {
- source.run(ctx);
- }
- catch (Throwable t) {
- error[0] = t;
- }
- }
- };
- runner.start();
-
- // wait for a bit
- Thread.sleep(1000);
-
- // make a checkpoint
- int count;
- List<Integer> checkpointData = new ArrayList<Integer>(NUM_ELEMENTS);
-
- synchronized (ctx.getCheckpointLock()) {
- count = source.snapshotState(566, System.currentTimeMillis());
- checkpointData.addAll(result);
- }
-
- // cancel the source
- source.cancel();
- runner.join();
-
- // check for errors
- if (error[0] != null) {
- System.err.println("Error in asynchronous source runner");
- error[0].printStackTrace();
- fail("Error in asynchronous source runner");
- }
-
- // recovery run
- SourceFunction.SourceContext<Integer> newCtx = new ListSourceContext<Integer>(checkpointData);
- sourceCopy.restoreState(count);
-
- sourceCopy.run(newCtx);
-
- assertEquals(data, checkpointData);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
-
- // ------------------------------------------------------------------------
- // Test Types
- // ------------------------------------------------------------------------
-
- public static class MyPojo {
-
- public long val1;
- public int val2;
-
- public MyPojo() {}
-
- public MyPojo(long val1, int val2) {
- this.val1 = val1;
- this.val2 = val2;
- }
-
- @Override
- public int hashCode() {
- return this.val2;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof MyPojo) {
- MyPojo that = (MyPojo) obj;
- return this.val1 == that.val1 && this.val2 == that.val2;
- }
- else {
- return false;
- }
- }
- }
-
- public static class SerializationErrorType implements Value {
-
- private static final long serialVersionUID = -6037206294939421807L;
-
- @Override
- public void write(DataOutputView out) throws IOException {
- throw new IOException("test exception");
- }
-
- @Override
- public void read(DataInputView in) throws IOException {
- throw new IOException("test exception");
- }
- }
-
- public static class DeserializeTooMuchType implements Value {
-
- private static final long serialVersionUID = -6037206294939421807L;
-
- @Override
- public void write(DataOutputView out) throws IOException {
- out.writeInt(42);
- }
-
- @Override
- public void read(DataInputView in) throws IOException {
- in.readLong();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
deleted file mode 100644
index e4dadf0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions;
-
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-
-import java.util.List;
-
-/**
- * Mock context that collects elements in a List.
- *
- * @param <T> Type of the collected elements.
- */
-public class ListSourceContext<T> implements SourceFunction.SourceContext<T> {
-
- private final Object lock = new Object();
-
- private final List<T> target;
-
- private final long delay;
-
-
- public ListSourceContext(List<T> target) {
- this(target, 0L);
- }
-
- public ListSourceContext(List<T> target, long delay) {
- this.target = target;
- this.delay = delay;
- }
-
- @Override
- public void collect(T element) {
- target.add(element);
-
- if (delay > 0) {
- try {
- Thread.sleep(delay);
- }
- catch (InterruptedException e) {
- // ignore
- }
- }
- }
-
- @Override
- public void collectWithTimestamp(T element, long timestamp) {
- target.add(element);
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- // don't do anything
- }
-
- @Override
- public Object getCheckpointLock() {
- return lock;
- }
-
- @Override
- public void close() {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
deleted file mode 100644
index f7c6e53..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
- */
-public class PrintSinkFunctionTest {
-
- public PrintStream printStreamOriginal = System.out;
- private String line = System.lineSeparator();
-
- @Test
- public void testPrintSinkStdOut(){
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PrintStream stream = new PrintStream(baos);
- System.setOut(stream);
-
- final StreamingRuntimeContext ctx = Mockito.mock(StreamingRuntimeContext.class);
-
- PrintSinkFunction<String> printSink = new PrintSinkFunction<>();
- printSink.setRuntimeContext(ctx);
- try {
- printSink.open(new Configuration());
- } catch (Exception e) {
- Assert.fail();
- }
- printSink.setTargetToStandardOut();
- printSink.invoke("hello world!");
-
- assertEquals("Print to System.out", printSink.toString());
- assertEquals("hello world!" + line, baos.toString());
-
- printSink.close();
- stream.close();
- }
-
- @Test
- public void testPrintSinkStdErr(){
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PrintStream stream = new PrintStream(baos);
- System.setOut(stream);
-
- final StreamingRuntimeContext ctx = Mockito.mock(StreamingRuntimeContext.class);
-
- PrintSinkFunction<String> printSink = new PrintSinkFunction<>();
- printSink.setRuntimeContext(ctx);
- try {
- printSink.open(new Configuration());
- } catch (Exception e) {
- Assert.fail();
- }
- printSink.setTargetToStandardErr();
- printSink.invoke("hello world!");
-
- assertEquals("Print to System.err", printSink.toString());
- assertEquals("hello world!" + line, baos.toString());
-
- printSink.close();
- stream.close();
- }
-
- @Test
- public void testPrintSinkWithPrefix(){
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PrintStream stream = new PrintStream(baos);
- System.setOut(stream);
-
- final StreamingRuntimeContext ctx = Mockito.mock(StreamingRuntimeContext.class);
- Mockito.when(ctx.getNumberOfParallelSubtasks()).thenReturn(2);
- Mockito.when(ctx.getIndexOfThisSubtask()).thenReturn(1);
-
- PrintSinkFunction<String> printSink = new PrintSinkFunction<>();
- printSink.setRuntimeContext(ctx);
- try {
- printSink.open(new Configuration());
- } catch (Exception e) {
- Assert.fail();
- }
- printSink.setTargetToStandardErr();
- printSink.invoke("hello world!");
-
- assertEquals("Print to System.err", printSink.toString());
- assertEquals("2> hello world!" + line, baos.toString());
-
- printSink.close();
- stream.close();
- }
-
- @After
- public void restoreSystemOut() {
- System.setOut(printStreamOriginal);
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
deleted file mode 100644
index 8f4acde..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
- */
-@SuppressWarnings("serial")
-public class SocketClientSinkTest extends TestLogger {
-
- private static final String TEST_MESSAGE = "testSocketSinkInvoke";
-
- private static final String EXCEPTION_MESSGAE = "Failed to send message '" + TEST_MESSAGE + "\n'";
-
- private static final String host = "127.0.0.1";
-
- private SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
- @Override
- public byte[] serialize(String element) {
- return element.getBytes();
- }
- };
-
- @Test
- public void testSocketSink() throws Exception {
- final ServerSocket server = new ServerSocket(0);
- final int port = server.getLocalPort();
-
- final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
-
- Thread sinkRunner = new Thread("Test sink runner") {
- @Override
- public void run() {
- try {
- SocketClientSink<String> simpleSink = new SocketClientSink<>(host, port, simpleSchema, 0);
- simpleSink.open(new Configuration());
- simpleSink.invoke(TEST_MESSAGE + '\n');
- simpleSink.close();
- }
- catch (Throwable t) {
- error.set(t);
- }
- }
- };
-
- sinkRunner.start();
-
- Socket sk = server.accept();
- BufferedReader rdr = new BufferedReader(new InputStreamReader(sk.getInputStream()));
-
- String value = rdr.readLine();
-
- sinkRunner.join();
- server.close();
-
- if (error.get() != null) {
- Throwable t = error.get();
- t.printStackTrace();
- fail("Error in spawned thread: " + t.getMessage());
- }
-
- assertEquals(TEST_MESSAGE, value);
- }
-
- @Test
- public void testSinkAutoFlush() throws Exception {
- final ServerSocket server = new ServerSocket(0);
- final int port = server.getLocalPort();
-
- final SocketClientSink<String> simpleSink = new SocketClientSink<>(host, port, simpleSchema, 0, true);
- simpleSink.open(new Configuration());
-
- final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
-
- Thread sinkRunner = new Thread("Test sink runner") {
- @Override
- public void run() {
- try {
- // need two messages here: send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
- simpleSink.invoke(TEST_MESSAGE + '\n');
- }
- catch (Throwable t) {
- error.set(t);
- }
- }
- };
-
- sinkRunner.start();
-
- Socket sk = server.accept();
- BufferedReader rdr = new BufferedReader(new InputStreamReader(sk.getInputStream()));
- String value = rdr.readLine();
-
- sinkRunner.join();
- simpleSink.close();
- server.close();
-
- if (error.get() != null) {
- Throwable t = error.get();
- t.printStackTrace();
- fail("Error in spawned thread: " + t.getMessage());
- }
-
- assertEquals(TEST_MESSAGE, value);
- }
-
- @Test
- public void testSocketSinkNoRetry() throws Exception {
- final ServerSocket server = new ServerSocket(0);
- final int port = server.getLocalPort();
-
- try {
- final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
-
- Thread serverRunner = new Thread("Test server runner") {
-
- @Override
- public void run() {
- try {
- Socket sk = server.accept();
- sk.close();
- }
- catch (Throwable t) {
- error.set(t);
- }
- }
- };
- serverRunner.start();
-
- SocketClientSink<String> simpleSink = new SocketClientSink<>(host, port, simpleSchema, 0, true);
- simpleSink.open(new Configuration());
-
- // wait socket server to close
- serverRunner.join();
- if (error.get() != null) {
- Throwable t = error.get();
- t.printStackTrace();
- fail("Error in server thread: " + t.getMessage());
- }
-
- try {
- // socket should be closed, so this should trigger a re-try
- // need two messages here: send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
- while (true) { // we have to do this more often as the server side closed is not guaranteed to be noticed immediately
- simpleSink.invoke(TEST_MESSAGE + '\n');
- }
- }
- catch (IOException e) {
- // check whether throw a exception that reconnect failed.
- assertTrue("Wrong exception", e.getMessage().contains(EXCEPTION_MESSGAE));
- }
- catch (Exception e) {
- fail("wrong exception: " + e.getClass().getName() + " - " + e.getMessage());
- }
-
- assertEquals(0, simpleSink.getCurrentNumberOfRetries());
- }
- finally {
- IOUtils.closeQuietly(server);
- }
- }
-
- @Test
- public void testRetry() throws Exception {
-
- final ServerSocket[] serverSocket = new ServerSocket[1];
- final ExecutorService[] executor = new ExecutorService[1];
-
- try {
- serverSocket[0] = new ServerSocket(0);
- executor[0] = Executors.newCachedThreadPool();
-
- int port = serverSocket[0].getLocalPort();
-
- Callable<Void> serverTask = new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- Socket socket = serverSocket[0].accept();
-
- BufferedReader reader = new BufferedReader(new InputStreamReader(
- socket.getInputStream()));
-
- String value = reader.readLine();
- assertEquals("0", value);
-
- socket.close();
- return null;
- }
- };
-
- Future<Void> serverFuture = executor[0].submit(serverTask);
-
- final SocketClientSink<String> sink = new SocketClientSink<>(
- host, serverSocket[0].getLocalPort(), simpleSchema, -1, true);
-
- // Create the connection
- sink.open(new Configuration());
-
- // Initial payload => this will be received by the server an then the socket will be
- // closed.
- sink.invoke("0\n");
-
- // Get future an make sure there was no problem. This will rethrow any Exceptions from
- // the server.
- serverFuture.get();
-
- // Shutdown the server socket
- serverSocket[0].close();
- assertTrue(serverSocket[0].isClosed());
-
- // No retries expected at this point
- assertEquals(0, sink.getCurrentNumberOfRetries());
-
- final CountDownLatch retryLatch = new CountDownLatch(1);
- final CountDownLatch again = new CountDownLatch(1);
-
- Callable<Void> sinkTask = new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- // Send next payload => server is down, should try to reconnect.
-
- // We need to send more than just one packet to notice the closed connection.
- while (retryLatch.getCount() != 0) {
- sink.invoke("1\n");
- }
-
- return null;
- }
- };
-
- Future<Void> sinkFuture = executor[0].submit(sinkTask);
-
- while (sink.getCurrentNumberOfRetries() == 0) {
- // Wait for a retry
- Thread.sleep(100);
- }
-
- // OK the poor guy retried to write
- retryLatch.countDown();
-
- // Restart the server
- serverSocket[0] = new ServerSocket(port);
- Socket socket = serverSocket[0].accept();
-
- BufferedReader reader = new BufferedReader(new InputStreamReader(
- socket.getInputStream()));
-
- // Wait for the reconnect
- String value = reader.readLine();
-
- assertEquals("1", value);
-
- // OK the sink re-connected. :)
- }
- finally {
- if (serverSocket[0] != null) {
- serverSocket[0].close();
- }
-
- if (executor[0] != null) {
- executor[0].shutdown();
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
deleted file mode 100644
index 2d9921a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.junit.Test;
-
-/**
- * Tests for the {@link org.apache.flink.streaming.api.functions.source.FileMonitoringFunction}.
- */
-public class FileMonitoringFunctionTest {
-
- @Test
- public void testForEmptyLocation() throws Exception {
- final FileMonitoringFunction fileMonitoringFunction
- = new FileMonitoringFunction("?non-existing-path", 1L, FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
-
- new Thread() {
- @Override
- public void run() {
- try {
- Thread.sleep(1000L);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- fileMonitoringFunction.cancel();
- }
- }.start();
-
- fileMonitoringFunction.run(
- new StreamSource.NonWatermarkContext<Tuple3<String, Long, Long>>(
- new Object(),
- new Output<StreamRecord<Tuple3<String, Long, Long>>>() {
- @Override
- public void emitWatermark(Watermark mark) { }
- @Override
- public void collect(StreamRecord<Tuple3<String, Long, Long>> record) { }
- @Override
- public void close() { }
- })
- );
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
deleted file mode 100644
index 3398451..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.commons.io.IOUtils;
-
-import org.apache.flink.streaming.api.watermark.Watermark;
-
-import org.junit.Test;
-
-import java.io.EOFException;
-import java.io.OutputStreamWriter;
-import java.net.ServerSocket;
-import java.net.Socket;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
- */
-public class SocketTextStreamFunctionTest {
-
- private static final String LOCALHOST = "127.0.0.1";
-
-
- @Test
- public void testSocketSourceSimpleOutput() throws Exception {
- ServerSocket server = new ServerSocket(0);
- Socket channel = null;
-
- try {
- SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), '\n', 0);
-
- SocketSourceThread runner = new SocketSourceThread(source, "test1", "check");
- runner.start();
-
- channel = server.accept();
- OutputStreamWriter writer = new OutputStreamWriter(channel.getOutputStream());
-
- writer.write("test1\n");
- writer.write("check\n");
- writer.flush();
- runner.waitForNumElements(2);
-
- runner.cancel();
- runner.interrupt();
-
- runner.waitUntilDone();
-
- channel.close();
- }
- finally {
- if (channel != null) {
- IOUtils.closeQuietly(channel);
- }
- IOUtils.closeQuietly(server);
- }
- }
-
- @Test
- public void testExitNoRetries() throws Exception {
- ServerSocket server = new ServerSocket(0);
- Socket channel = null;
-
- try {
- SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), '\n', 0);
-
- SocketSourceThread runner = new SocketSourceThread(source);
- runner.start();
-
- channel = server.accept();
- channel.close();
-
- try {
- runner.waitUntilDone();
- }
- catch (Exception e) {
- assertTrue(e.getCause() instanceof EOFException);
- }
- }
- finally {
- if (channel != null) {
- IOUtils.closeQuietly(channel);
- }
- IOUtils.closeQuietly(server);
- }
- }
-
- @Test
- public void testSocketSourceOutputWithRetries() throws Exception {
- ServerSocket server = new ServerSocket(0);
- Socket channel = null;
-
- try {
- SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), '\n', 10, 100);
-
- SocketSourceThread runner = new SocketSourceThread(source, "test1", "check");
- runner.start();
-
- // first connection: nothing
- channel = server.accept();
- channel.close();
-
- // second connection: first string
- channel = server.accept();
- OutputStreamWriter writer = new OutputStreamWriter(channel.getOutputStream());
- writer.write("test1\n");
- writer.close();
- channel.close();
-
- // third connection: nothing
- channel = server.accept();
- channel.close();
-
- // forth connection: second string
- channel = server.accept();
- writer = new OutputStreamWriter(channel.getOutputStream());
- writer.write("check\n");
- writer.flush();
-
- runner.waitForNumElements(2);
- runner.cancel();
- runner.waitUntilDone();
- }
- finally {
- if (channel != null) {
- IOUtils.closeQuietly(channel);
- }
- IOUtils.closeQuietly(server);
- }
- }
-
- @Test
- public void testSocketSourceOutputInfiniteRetries() throws Exception {
- ServerSocket server = new ServerSocket(0);
- Socket channel = null;
-
- try {
- SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), '\n', -1, 100);
-
- SocketSourceThread runner = new SocketSourceThread(source, "test1", "check");
- runner.start();
-
- // first connection: nothing
- channel = server.accept();
- channel.close();
-
- // second connection: first string
- channel = server.accept();
- OutputStreamWriter writer = new OutputStreamWriter(channel.getOutputStream());
- writer.write("test1\n");
- writer.close();
- channel.close();
-
- // third connection: nothing
- channel = server.accept();
- channel.close();
-
- // forth connection: second string
- channel = server.accept();
- writer = new OutputStreamWriter(channel.getOutputStream());
- writer.write("check\n");
- writer.flush();
-
- runner.waitForNumElements(2);
- runner.cancel();
- runner.waitUntilDone();
- }
- finally {
- if (channel != null) {
- IOUtils.closeQuietly(channel);
- }
- IOUtils.closeQuietly(server);
- }
- }
-
- @Test
- public void testSocketSourceOutputAcrossRetries() throws Exception {
- ServerSocket server = new ServerSocket(0);
- Socket channel = null;
-
- try {
- SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), '\n', 10, 100);
-
- SocketSourceThread runner = new SocketSourceThread(source, "test1", "check1", "check2");
- runner.start();
-
- // first connection: nothing
- channel = server.accept();
- channel.close();
-
- // second connection: first string
- channel = server.accept();
- OutputStreamWriter writer = new OutputStreamWriter(channel.getOutputStream());
- writer.write("te");
- writer.close();
- channel.close();
-
- // third connection: nothing
- channel = server.accept();
- channel.close();
-
- // forth connection: second string
- channel = server.accept();
- writer = new OutputStreamWriter(channel.getOutputStream());
- writer.write("st1\n");
- writer.write("check1\n");
- writer.write("check2\n");
- writer.flush();
-
- runner.waitForNumElements(2);
- runner.cancel();
- runner.waitUntilDone();
- }
- finally {
- if (channel != null) {
- IOUtils.closeQuietly(channel);
- }
- IOUtils.closeQuietly(server);
- }
- }
-
- // ------------------------------------------------------------------------
-
- private static class SocketSourceThread extends Thread {
-
- private final Object sync = new Object();
-
- private final SocketTextStreamFunction socketSource;
-
- private final String[] expectedData;
-
- private volatile Throwable error;
- private volatile int numElementsReceived;
- private volatile boolean canceled;
- private volatile boolean done;
-
- public SocketSourceThread(SocketTextStreamFunction socketSource, String... expectedData) {
- this.socketSource = socketSource;
- this.expectedData = expectedData;
- }
-
- public void run() {
- try {
- SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
-
- private final Object lock = new Object();
-
- @Override
- public void collect(String element) {
- int pos = numElementsReceived;
-
- // make sure waiter know of us
- synchronized (sync) {
- numElementsReceived++;
- sync.notifyAll();
- }
-
- if (expectedData != null && expectedData.length > pos) {
- assertEquals(expectedData[pos], element);
- }
- }
-
- @Override
- public void collectWithTimestamp(String element, long timestamp) {
- collect(element);
- }
-
- @Override
- public void emitWatermark(Watermark mark) {}
-
- @Override
- public Object getCheckpointLock() {
- return lock;
- }
-
- @Override
- public void close() {}
- };
-
- socketSource.run(ctx);
- }
- catch (Throwable t) {
- synchronized (sync) {
- if (!canceled) {
- error = t;
- }
- sync.notifyAll();
- }
- }
- finally {
- synchronized (sync) {
- done = true;
- sync.notifyAll();
- }
- }
- }
-
- public void cancel() {
- synchronized (sync) {
- canceled = true;
- socketSource.cancel();
- interrupt();
- }
- }
-
- public void waitForNumElements(int numElements) throws InterruptedException {
- synchronized (sync) {
- while (error == null && !canceled && !done && numElementsReceived < numElements) {
- sync.wait();
- }
-
- if (error != null) {
- throw new RuntimeException("Error in source thread", error);
- }
- if (canceled) {
- throw new RuntimeException("canceled");
- }
- if (done) {
- throw new RuntimeException("Exited cleanly before expected number of elements");
- }
- }
- }
-
- public void waitUntilDone() throws InterruptedException {
- join();
-
- if (error != null) {
- throw new RuntimeException("Error in source thread", error);
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTupleTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTupleTest.java
deleted file mode 100644
index c98a659..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTupleTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple10;
-import org.apache.flink.api.java.tuple.Tuple11;
-import org.apache.flink.api.java.tuple.Tuple12;
-import org.apache.flink.api.java.tuple.Tuple13;
-import org.apache.flink.api.java.tuple.Tuple14;
-import org.apache.flink.api.java.tuple.Tuple15;
-import org.apache.flink.api.java.tuple.Tuple16;
-import org.apache.flink.api.java.tuple.Tuple17;
-import org.apache.flink.api.java.tuple.Tuple18;
-import org.apache.flink.api.java.tuple.Tuple19;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple20;
-import org.apache.flink.api.java.tuple.Tuple21;
-import org.apache.flink.api.java.tuple.Tuple22;
-import org.apache.flink.api.java.tuple.Tuple23;
-import org.apache.flink.api.java.tuple.Tuple24;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple6;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.java.tuple.Tuple9;
-import org.apache.flink.streaming.api.functions.windowing.delta.extractor.ArrayFromTuple;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ArrayFromTupleTest {
-
- private String[] testStrings;
-
- @Before
- public void init() {
- testStrings = new String[Tuple.MAX_ARITY];
- for (int i = 0; i < Tuple.MAX_ARITY; i++) {
- testStrings[i] = Integer.toString(i);
- }
- }
-
- @Test
- public void testConvertFromTupleToArray() throws InstantiationException, IllegalAccessException {
- for (int i = 0; i < Tuple.MAX_ARITY; i++) {
- Tuple currentTuple = (Tuple) CLASSES[i].newInstance();
- String[] currentArray = new String[i + 1];
- for (int j = 0; j <= i; j++) {
- currentTuple.setField(testStrings[j], j);
- currentArray[j] = testStrings[j];
- }
- arrayEqualityCheck(currentArray, new ArrayFromTuple().extract(currentTuple));
- }
- }
-
- @Test
- public void testUserSpecifiedOrder() throws InstantiationException, IllegalAccessException {
- Tuple currentTuple = (Tuple) CLASSES[Tuple.MAX_ARITY - 1].newInstance();
- for (int i = 0; i < Tuple.MAX_ARITY; i++) {
- currentTuple.setField(testStrings[i], i);
- }
-
- String[] expected = { testStrings[5], testStrings[3], testStrings[6], testStrings[7],
- testStrings[0] };
- arrayEqualityCheck(expected, new ArrayFromTuple(5, 3, 6, 7, 0).extract(currentTuple));
-
- String[] expected2 = { testStrings[0], testStrings[Tuple.MAX_ARITY - 1] };
- arrayEqualityCheck(expected2,
- new ArrayFromTuple(0, Tuple.MAX_ARITY - 1).extract(currentTuple));
-
- String[] expected3 = { testStrings[Tuple.MAX_ARITY - 1], testStrings[0] };
- arrayEqualityCheck(expected3,
- new ArrayFromTuple(Tuple.MAX_ARITY - 1, 0).extract(currentTuple));
-
- String[] expected4 = { testStrings[13], testStrings[4], testStrings[5], testStrings[4],
- testStrings[2], testStrings[8], testStrings[6], testStrings[2], testStrings[8],
- testStrings[3], testStrings[5], testStrings[2], testStrings[16], testStrings[4],
- testStrings[3], testStrings[2], testStrings[6], testStrings[4], testStrings[7],
- testStrings[4], testStrings[2], testStrings[8], testStrings[7], testStrings[2] };
- arrayEqualityCheck(expected4, new ArrayFromTuple(13, 4, 5, 4, 2, 8, 6, 2, 8, 3, 5, 2, 16,
- 4, 3, 2, 6, 4, 7, 4, 2, 8, 7, 2).extract(currentTuple));
- }
-
- private void arrayEqualityCheck(Object[] array1, Object[] array2) {
- assertEquals("The result arrays must have the same length", array1.length, array2.length);
- for (int i = 0; i < array1.length; i++) {
- assertEquals("Unequal fields at position " + i, array1[i], array2[i]);
- }
- }
-
- private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class,
- Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
- Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
- Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
- Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class,
- Tuple24.class, Tuple25.class };
-}