You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:41 UTC
[25/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
deleted file mode 100644
index 4dbf7b8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import static org.junit.Assert.fail;
-
-import java.io.Serializable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Tests for {@link CoStreamFlatMap}. These test that:
- *
- * <ul>
- * <li>RichFunction methods are called correctly</li>
- * <li>Timestamps of processed elements match the input timestamp</li>
- * <li>Watermarks are correctly forwarded</li>
- * </ul>
- */
-public class CoStreamFlatMapTest implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final static class MyCoFlatMap implements CoFlatMapFunction<String, Integer, String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap1(String value, Collector<String> coll) {
- for (int i = 0; i < value.length(); i++) {
- coll.collect(value.substring(i, i + 1));
- }
- }
-
- @Override
- public void flatMap2(Integer value, Collector<String> coll) {
- coll.collect(value.toString());
- }
- }
-
- @Test
- public void testCoFlatMap() throws Exception {
- CoStreamFlatMap<String, Integer, String> operator = new CoStreamFlatMap<String, Integer, String>(new MyCoFlatMap());
-
- TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = new TwoInputStreamOperatorTestHarness<String, Integer, String>(operator);
-
- long initialTime = 0L;
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
-
- testHarness.open();
-
- testHarness.processElement1(new StreamRecord<String>("abc", initialTime + 1));
- testHarness.processElement1(new StreamRecord<String>("def", initialTime + 2));
- testHarness.processWatermark1(new Watermark(initialTime + 2));
- testHarness.processElement1(new StreamRecord<String>("ghi", initialTime + 3));
-
- testHarness.processElement2(new StreamRecord<Integer>(1, initialTime + 1));
- testHarness.processElement2(new StreamRecord<Integer>(2, initialTime + 2));
- testHarness.processWatermark2(new Watermark(initialTime + 3));
- testHarness.processElement2(new StreamRecord<Integer>(3, initialTime + 3));
- testHarness.processElement2(new StreamRecord<Integer>(4, initialTime + 4));
- testHarness.processElement2(new StreamRecord<Integer>(5, initialTime + 5));
-
- expectedOutput.add(new StreamRecord<String>("a", initialTime + 1));
- expectedOutput.add(new StreamRecord<String>("b", initialTime + 1));
- expectedOutput.add(new StreamRecord<String>("c", initialTime + 1));
- expectedOutput.add(new StreamRecord<String>("d", initialTime + 2));
- expectedOutput.add(new StreamRecord<String>("e", initialTime + 2));
- expectedOutput.add(new StreamRecord<String>("f", initialTime + 2));
- expectedOutput.add(new StreamRecord<String>("g", initialTime + 3));
- expectedOutput.add(new StreamRecord<String>("h", initialTime + 3));
- expectedOutput.add(new StreamRecord<String>("i", initialTime + 3));
-
- expectedOutput.add(new StreamRecord<String>("1", initialTime + 1));
- expectedOutput.add(new StreamRecord<String>("2", initialTime + 2));
- expectedOutput.add(new Watermark(initialTime + 2));
- expectedOutput.add(new StreamRecord<String>("3", initialTime + 3));
- expectedOutput.add(new StreamRecord<String>("4", initialTime + 4));
- expectedOutput.add(new StreamRecord<String>("5", initialTime + 5));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
- }
-
- @Test
- public void testOpenClose() throws Exception {
- CoStreamFlatMap<String, Integer, String> operator = new CoStreamFlatMap<String, Integer, String>(new TestOpenCloseCoFlatMapFunction());
-
- TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = new TwoInputStreamOperatorTestHarness<String, Integer, String>(operator);
-
- long initialTime = 0L;
-
- testHarness.open();
-
- testHarness.processElement1(new StreamRecord<String>("Hello", initialTime));
- testHarness.processElement2(new StreamRecord<Integer>(42, initialTime));
-
- testHarness.close();
-
- Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseCoFlatMapFunction.closeCalled);
- Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);
- }
-
- // This must only be used in one test, otherwise the static fields will be changed
- // by several tests concurrently
- private static class TestOpenCloseCoFlatMapFunction extends RichCoFlatMapFunction<String, Integer, String> {
- private static final long serialVersionUID = 1L;
-
- public static boolean openCalled = false;
- public static boolean closeCalled = false;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- if (closeCalled) {
- Assert.fail("Close called before open.");
- }
- openCalled = true;
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- if (!openCalled) {
- Assert.fail("Open was not called before close.");
- }
- closeCalled = true;
- }
-
- @Override
- public void flatMap1(String value, Collector<String> out) throws Exception {
- if (!openCalled) {
- Assert.fail("Open was not called before run.");
- }
- out.collect(value);
- }
-
- @Override
- public void flatMap2(Integer value, Collector<String> out) throws Exception {
- if (!openCalled) {
- Assert.fail("Open was not called before run.");
- }
- out.collect(value.toString());
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java
deleted file mode 100644
index 28ae664..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.Serializable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * Tests for {@link org.apache.flink.streaming.api.operators.co.CoStreamMap}. These test that:
- *
- * <ul>
- * <li>RichFunction methods are called correctly</li>
- * <li>Timestamps of processed elements match the input timestamp</li>
- * <li>Watermarks are correctly forwarded</li>
- * </ul>
- */
-public class CoStreamMapTest implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final static class MyCoMap implements CoMapFunction<Double, Integer, String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map1(Double value) {
- return value.toString();
- }
-
- @Override
- public String map2(Integer value) {
- return value.toString();
- }
- }
-
-
- @Test
- public void testCoMap() throws Exception {
- CoStreamMap<Double, Integer, String> operator = new CoStreamMap<Double, Integer, String>(new MyCoMap());
-
- TwoInputStreamOperatorTestHarness<Double, Integer, String> testHarness = new TwoInputStreamOperatorTestHarness<Double, Integer, String>(operator);
-
- long initialTime = 0L;
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
-
- testHarness.open();
-
- testHarness.processElement1(new StreamRecord<Double>(1.1d, initialTime + 1));
- testHarness.processElement1(new StreamRecord<Double>(1.2d, initialTime + 2));
- testHarness.processElement1(new StreamRecord<Double>(1.3d, initialTime + 3));
- testHarness.processWatermark1(new Watermark(initialTime + 3));
- testHarness.processElement1(new StreamRecord<Double>(1.4d, initialTime + 4));
- testHarness.processElement1(new StreamRecord<Double>(1.5d, initialTime + 5));
-
- testHarness.processElement2(new StreamRecord<Integer>(1, initialTime + 1));
- testHarness.processElement2(new StreamRecord<Integer>(2, initialTime + 2));
- testHarness.processWatermark2(new Watermark(initialTime + 2));
- testHarness.processElement2(new StreamRecord<Integer>(3, initialTime + 3));
- testHarness.processElement2(new StreamRecord<Integer>(4, initialTime + 4));
- testHarness.processElement2(new StreamRecord<Integer>(5, initialTime + 5));
-
- expectedOutput.add(new StreamRecord<String>("1.1", initialTime + 1));
- expectedOutput.add(new StreamRecord<String>("1.2", initialTime + 2));
- expectedOutput.add(new StreamRecord<String>("1.3", initialTime + 3));
- expectedOutput.add(new StreamRecord<String>("1.4", initialTime + 4));
- expectedOutput.add(new StreamRecord<String>("1.5", initialTime + 5));
-
- expectedOutput.add(new StreamRecord<String>("1", initialTime + 1));
- expectedOutput.add(new StreamRecord<String>("2", initialTime + 2));
- expectedOutput.add(new Watermark(initialTime + 2));
- expectedOutput.add(new StreamRecord<String>("3", initialTime + 3));
- expectedOutput.add(new StreamRecord<String>("4", initialTime + 4));
- expectedOutput.add(new StreamRecord<String>("5", initialTime + 5));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
- }
-
- @Test
- public void testOpenClose() throws Exception {
- CoStreamMap<Double, Integer, String> operator = new CoStreamMap<Double, Integer, String>(new TestOpenCloseCoMapFunction());
-
- TwoInputStreamOperatorTestHarness<Double, Integer, String> testHarness = new TwoInputStreamOperatorTestHarness<Double, Integer, String>(operator);
-
- long initialTime = 0L;
-
- testHarness.open();
-
- testHarness.processElement1(new StreamRecord<Double>(74d, initialTime));
- testHarness.processElement2(new StreamRecord<Integer>(42, initialTime));
-
- testHarness.close();
-
- Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseCoMapFunction.closeCalled);
- Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);
- }
-
- // This must only be used in one test, otherwise the static fields will be changed
- // by several tests concurrently
- private static class TestOpenCloseCoMapFunction extends RichCoMapFunction<Double, Integer, String> {
- private static final long serialVersionUID = 1L;
-
- public static boolean openCalled = false;
- public static boolean closeCalled = false;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- if (closeCalled) {
- Assert.fail("Close called before open.");
- }
- openCalled = true;
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- if (!openCalled) {
- Assert.fail("Open was not called before close.");
- }
- closeCalled = true;
- }
-
- @Override
- public String map1(Double value) throws Exception {
- if (!openCalled) {
- Assert.fail("Open was not called before run.");
- }
- return value.toString();
- }
-
- @Override
- public String map2(Integer value) throws Exception {
- if (!openCalled) {
- Assert.fail("Open was not called before run.");
- }
- return value.toString();
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoWindowTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoWindowTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoWindowTest.java
deleted file mode 100644
index 130842e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoWindowTest.java
+++ /dev/null
@@ -1,182 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements. See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.flink.streaming.api.operators.co;
-//
-//import static org.junit.Assert.assertEquals;
-//
-//import java.util.ArrayList;
-//import java.util.HashSet;
-//import java.util.List;
-//import java.util.Set;
-//
-//import org.apache.flink.api.java.tuple.Tuple2;
-//import org.apache.flink.streaming.api.functions.co.CoWindowFunction;
-//import org.apache.flink.streaming.api.operators.co.CoStreamWindow;
-//import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-//import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-//import org.apache.flink.streaming.util.MockCoContext;
-//import org.apache.flink.util.Collector;
-//import org.junit.Test;
-//
-//public class CoWindowTest {
-//
-// public static final class MyCoGroup1 implements CoWindowFunction<Integer, Integer, Integer> {
-//
-// private static final long serialVersionUID = 1L;
-//
-// @SuppressWarnings("unused")
-// @Override
-// public void coWindow(List<Integer> first, List<Integer> second, Collector<Integer> out)
-// throws Exception {
-// Integer count1 = 0;
-// for (Integer i : first) {
-// count1++;
-// }
-// Integer count2 = 0;
-// for (Integer i : second) {
-// count2++;
-// }
-// out.collect(count1);
-// out.collect(count2);
-//
-// }
-//
-// }
-//
-// public static final class MyCoGroup2 implements
-// CoWindowFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> {
-//
-// private static final long serialVersionUID = 1L;
-//
-// @Override
-// public void coWindow(List<Tuple2<Integer, Integer>> first,
-// List<Tuple2<Integer, Integer>> second, Collector<Integer> out) throws Exception {
-//
-// Set<Integer> firstElements = new HashSet<Integer>();
-// for (Tuple2<Integer, Integer> value : first) {
-// firstElements.add(value.f1);
-// }
-// for (Tuple2<Integer, Integer> value : second) {
-// if (firstElements.contains(value.f1)) {
-// out.collect(value.f1);
-// }
-// }
-//
-// }
-//
-// }
-//
-// private static final class MyTS1 implements Timestamp<Integer> {
-//
-// private static final long serialVersionUID = 1L;
-//
-// @Override
-// public long getTimestamp(Integer value) {
-// return value;
-// }
-//
-// }
-//
-// private static final class MyTS2 implements Timestamp<Tuple2<Integer, Integer>> {
-//
-// private static final long serialVersionUID = 1L;
-//
-// @Override
-// public long getTimestamp(Tuple2<Integer, Integer> value) {
-// return value.f0;
-// }
-//
-// }
-//
-// @Test
-// public void coWindowGroupReduceTest2() throws Exception {
-//
-// CoStreamWindow<Integer, Integer, Integer> invokable1 = new CoStreamWindow<Integer, Integer, Integer>(
-// new MyCoGroup1(), 2, 1, new TimestampWrapper<Integer>(new MyTS1(), 1),
-// new TimestampWrapper<Integer>(new MyTS1(), 1));
-//
-// // Windowsize 2, slide 1
-// // 1,2|2,3|3,4|4,5
-//
-// List<Integer> input11 = new ArrayList<Integer>();
-// input11.add(1);
-// input11.add(1);
-// input11.add(2);
-// input11.add(3);
-// input11.add(3);
-//
-// List<Integer> input12 = new ArrayList<Integer>();
-// input12.add(1);
-// input12.add(2);
-// input12.add(3);
-// input12.add(3);
-// input12.add(5);
-//
-// // Windows: (1,1,2)(1,1,2)|(2,3,3)(2,3,3)|(3,3)(3,3)|(5)(5)
-// // expected output: 3,2|3,3|2,2|0,1
-//
-// List<Integer> expected1 = new ArrayList<Integer>();
-// expected1.add(3);
-// expected1.add(2);
-// expected1.add(3);
-// expected1.add(3);
-// expected1.add(2);
-// expected1.add(2);
-// expected1.add(0);
-// expected1.add(1);
-//
-// List<Integer> actual1 = MockCoContext.createAndExecute(invokable1, input11, input12);
-// assertEquals(expected1, actual1);
-//
-// CoStreamWindow<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> invokable2 = new CoStreamWindow<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer>(
-// new MyCoGroup2(), 2, 3, new TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(),
-// 1), new TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(), 1));
-//
-// // WindowSize 2, slide 3
-// // 1,2|4,5|7,8|
-//
-// List<Tuple2<Integer, Integer>> input21 = new ArrayList<Tuple2<Integer, Integer>>();
-// input21.add(new Tuple2<Integer, Integer>(1, 1));
-// input21.add(new Tuple2<Integer, Integer>(1, 2));
-// input21.add(new Tuple2<Integer, Integer>(2, 3));
-// input21.add(new Tuple2<Integer, Integer>(3, 4));
-// input21.add(new Tuple2<Integer, Integer>(3, 5));
-// input21.add(new Tuple2<Integer, Integer>(4, 6));
-// input21.add(new Tuple2<Integer, Integer>(4, 7));
-// input21.add(new Tuple2<Integer, Integer>(5, 8));
-//
-// List<Tuple2<Integer, Integer>> input22 = new ArrayList<Tuple2<Integer, Integer>>();
-// input22.add(new Tuple2<Integer, Integer>(1, 1));
-// input22.add(new Tuple2<Integer, Integer>(2, 0));
-// input22.add(new Tuple2<Integer, Integer>(2, 2));
-// input22.add(new Tuple2<Integer, Integer>(3, 9));
-// input22.add(new Tuple2<Integer, Integer>(3, 4));
-// input22.add(new Tuple2<Integer, Integer>(4, 10));
-// input22.add(new Tuple2<Integer, Integer>(5, 8));
-// input22.add(new Tuple2<Integer, Integer>(5, 7));
-//
-// List<Integer> expected2 = new ArrayList<Integer>();
-// expected2.add(1);
-// expected2.add(2);
-// expected2.add(8);
-// expected2.add(7);
-//
-// List<Integer> actual2 = MockCoContext.createAndExecute(invokable2, input21, input22);
-// assertEquals(expected2, actual2);
-// }
-//}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
deleted file mode 100644
index d00dc67..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestListResultSink;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class SelfConnectionTest extends StreamingMultipleProgramsTestBase {
-
- private static List<String> expected;
-
- /**
- * We connect two different data streams in a chain to a CoMap.
- */
- @Test
- public void differentDataStreamSameChain() {
-
- TestListResultSink<String> resultSink = new TestListResultSink<String>();
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- DataStream<Integer> src = env.fromElements(1, 3, 5);
-
- DataStream<String> stringMap = src.map(new MapFunction<Integer, String>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map(Integer value) throws Exception {
- return "x " + value;
- }
- });
-
- stringMap.connect(src).map(new CoMapFunction<String, Integer, String>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map1(String value) {
- return value;
- }
-
- @Override
- public String map2(Integer value) {
- return String.valueOf(value + 1);
- }
- }).addSink(resultSink);
-
- try {
- env.execute();
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- expected = new ArrayList<String>();
-
- expected.addAll(Arrays.asList("x 1", "x 3", "x 5", "2", "4", "6"));
-
- List<String> result = resultSink.getResult();
-
- Collections.sort(expected);
- Collections.sort(result);
-
- assertEquals(expected, result);
- }
-
- /**
- * We connect two different data streams in different chains to a CoMap.
- * (This is not actually self-connect.)
- */
- @Test
- public void differentDataStreamDifferentChain() {
-
- TestListResultSink<String> resultSink = new TestListResultSink<String>();
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(3);
-
- DataStream<Integer> src = env.fromElements(1, 3, 5).disableChaining();
-
- DataStream<String> stringMap = src.flatMap(new FlatMapFunction<Integer, String>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap(Integer value, Collector<String> out) throws Exception {
- out.collect("x " + value);
- }
- }).keyBy(new KeySelector<String, Integer>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer getKey(String value) throws Exception {
- return value.length();
- }
- });
-
- DataStream<Long> longMap = src.map(new MapFunction<Integer, Long>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Long map(Integer value) throws Exception {
- return Long.valueOf(value + 1);
- }
- }).keyBy(new KeySelector<Long, Long>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Long getKey(Long value) throws Exception {
- return value;
- }
- });
-
-
- stringMap.connect(longMap).map(new CoMapFunction<String, Long, String>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map1(String value) {
- return value;
- }
-
- @Override
- public String map2(Long value) {
- return value.toString();
- }
- }).addSink(resultSink);
-
- try {
- env.execute();
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- expected = new ArrayList<String>();
-
- expected.addAll(Arrays.asList("x 1", "x 3", "x 5", "2", "4", "6"));
-
- List<String> result = resultSink.getResult();
-
- Collections.sort(expected);
- Collections.sort(result);
-
- assertEquals(expected, result);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
deleted file mode 100644
index 5377e09..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.outputformat;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.util.Collector;
-
-public class CsvOutputFormatITCase extends StreamingProgramTestBase {
-
- protected String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void testProgram() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<String> text = env.fromElements(WordCountData.TEXT);
-
- DataStream<Tuple2<String, Integer>> counts = text
- .flatMap(new Tokenizer())
- .keyBy(0).sum(1);
-
- counts.writeAsCsv(resultPath);
-
- env.execute("WriteAsCsvTest");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- //Strip the parentheses from the expected text like output
- compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES
- .replaceAll("[\\\\(\\\\)]", ""), resultPath);
- }
-
- public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
- throws Exception {
- // normalize and split the line
- String[] tokens = value.toLowerCase().split("\\W+");
-
- // emit the pairs
- for (String token : tokens) {
- if (token.length() > 0) {
- out.collect(new Tuple2<String, Integer>(token, 1));
- }
- }
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java
deleted file mode 100644
index 49876ec..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.outputformat;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.SocketOutputTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-import org.junit.Ignore;
-
-@Ignore
-//This test sometimes failes most likely due to the behaviour
-//of the socket. Disabled for now.
-public class SocketOutputFormatITCase extends SocketOutputTestBase {
-
- @Override
- protected void testProgram() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<String> text = env.fromElements(WordCountData.TEXT);
-
- DataStream<String> counts =
- text.flatMap(new CsvOutputFormatITCase.Tokenizer())
- .keyBy(0).sum(1).map(new MapFunction<Tuple2<String, Integer>, String>() {
- @Override
- public String map(Tuple2<String, Integer> value) throws Exception {
- return value.toString() + "\n";
- }
- });
- counts.writeToSocket(HOST, port, new DummyStringSchema());
-
- env.execute("WriteToSocketTest");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
deleted file mode 100644
index 380f00d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.outputformat;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-
-public class TextOutputFormatITCase extends StreamingProgramTestBase {
-
- protected String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void testProgram() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<String> text = env.fromElements(WordCountData.TEXT);
-
- DataStream<Tuple2<String, Integer>> counts = text
- .flatMap(new CsvOutputFormatITCase.Tokenizer())
- .keyBy(0).sum(1);
-
- counts.writeAsText(resultPath);
-
- env.execute("WriteAsTextTest");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java
deleted file mode 100644
index 317a21c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamtask;
-
-import java.util.ArrayList;
-
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.runtime.operators.DataSourceTask;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-
-public class MockRecordWriter extends RecordWriter<SerializationDelegate<StreamRecord<Tuple1<Integer>>>> {
-
- public ArrayList<Integer> emittedRecords;
-
- public MockRecordWriter(DataSourceTask<?> inputBase, Class<StreamRecord<Tuple1<Integer>>> outputClass) {
- super(inputBase.getEnvironment().getWriter(0));
- }
-
- public boolean initList() {
- emittedRecords = new ArrayList<Integer>();
- return true;
- }
-
- @Override
- public void emit(SerializationDelegate<StreamRecord<Tuple1<Integer>>> record) {
- emittedRecords.add(record.getInstance().getValue().f0);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
deleted file mode 100644
index 8f5f8df..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamtask;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import static org.junit.Assert.*;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ ResultPartitionWriter.class })
-public class StreamIterationHeadTest {
-
- @Test
- public void testIterationHeadWatermarkEmission() throws Exception {
- StreamIterationHead<Integer> head = new StreamIterationHead<>();
- StreamTaskTestHarness<Integer> harness = new StreamTaskTestHarness<>(head,
- BasicTypeInfo.INT_TYPE_INFO);
- harness.getStreamConfig().setIterationId("1");
- harness.getStreamConfig().setIterationWaitTime(1);
-
- harness.invoke();
- harness.waitForTaskCompletion();
-
- assertEquals(1, harness.getOutput().size());
- assertEquals(new Watermark(Long.MAX_VALUE), harness.getOutput().peek());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
deleted file mode 100644
index 122aa8a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamtask;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.junit.Test;
-
-public class StreamVertexTest extends StreamingMultipleProgramsTestBase {
-
- private static Map<Integer, Integer> data = new HashMap<Integer, Integer>();
-
- public static class MySource implements SourceFunction<Tuple1<Integer>> {
- private static final long serialVersionUID = 1L;
-
- private Tuple1<Integer> tuple = new Tuple1<Integer>(0);
-
- private int i = 0;
-
- @Override
- public void run(SourceContext<Tuple1<Integer>> ctx) throws Exception {
- for (int i = 0; i < 10; i++) {
- tuple.f0 = i;
- ctx.collect(tuple);
- }
- }
-
- @Override
- public void cancel() {
- }
- }
-
- public static class MyTask extends RichMapFunction<Tuple1<Integer>, Tuple2<Integer, Integer>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<Integer, Integer> map(Tuple1<Integer> value) throws Exception {
- Integer i = value.f0;
- return new Tuple2<Integer, Integer>(i, i + 1);
- }
- }
-
- public static class MySink implements SinkFunction<Tuple2<Integer, Integer>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void invoke(Tuple2<Integer, Integer> tuple) {
- Integer k = tuple.getField(0);
- Integer v = tuple.getField(1);
- data.put(k, v);
- }
-
- }
-
- @SuppressWarnings("unused")
- private static final int SOURCE_PARALELISM = 1;
-
- @Test
- public void wrongJobGraph() {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(SOURCE_PARALELISM);
-
- try {
- env.fromCollection(null);
- fail();
- } catch (NullPointerException e) {
- }
-
- try {
- env.fromElements();
- fail();
- } catch (IllegalArgumentException e) {
- }
-
- try {
- env.generateSequence(-10, -30);
- fail();
- } catch (IllegalArgumentException e) {
- }
-
- try {
- env.setBufferTimeout(-10);
- fail();
- } catch (IllegalArgumentException e) {
- }
-
- try {
- env.generateSequence(1, 10).project(2);
- fail();
- } catch (RuntimeException e) {
- }
- }
-
- private static class CoMap implements CoMapFunction<String, Long, String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map1(String value) {
- // System.out.println(value);
- return value;
- }
-
- @Override
- public String map2(Long value) {
- // System.out.println(value);
- return value.toString();
- }
- }
-
- private static class SetSink implements SinkFunction<String> {
- private static final long serialVersionUID = 1L;
- public static Set<String> result = Collections.synchronizedSet(new HashSet<String>());
-
- @Override
- public void invoke(String value) {
- result.add(value);
- }
-
- }
-
- @Test
- public void coTest() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(SOURCE_PARALELISM);
-
- DataStream<String> fromStringElements = env.fromElements("aa", "bb", "cc");
- DataStream<Long> generatedSequence = env.generateSequence(0, 3);
-
- fromStringElements.connect(generatedSequence).map(new CoMap()).addSink(new SetSink());
-
- env.execute();
-
- HashSet<String> expectedSet = new HashSet<String>(Arrays.asList("aa", "bb", "cc", "0", "1",
- "2", "3"));
- assertEquals(expectedSet, SetSink.result);
- }
-
- @Test
- public void runStream() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(SOURCE_PARALELISM);
-
- env.addSource(new MySource()).setParallelism(SOURCE_PARALELISM).map(new MyTask())
- .addSink(new MySink());
-
- env.execute();
- assertEquals(10, data.keySet().size());
-
- for (Integer k : data.keySet()) {
- assertEquals((Integer) (k + 1), data.get(k));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
deleted file mode 100644
index bdc7e94..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.deltafunction;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.streaming.api.functions.windowing.delta.CosineDistance;
-import org.junit.Test;
-
-public class CosineDistanceTest {
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testCosineDistance() {
-
- //Reference calculated using wolfram alpha
- double[][][] testdata={
- {{0,0,0},{0,0,0}},
- {{0,0,0},{1,2,3}},
- {{1,2,3},{0,0,0}},
- {{1,2,3},{4,5,6}},
- {{1,2,3},{-4,-5,-6}},
- {{1,2,-3},{-4,5,-6}},
- {{1,2,3,4},{5,6,7,8}},
- {{1,2},{3,4}},
- {{1},{2}},
- };
- double[] referenceSolutions={
- 0,
- 0,
- 0,
- 0.025368,
- 1.974631,
- 0.269026,
- 0.031136,
- 0.016130,
- 0
- };
-
- for (int i = 0; i < testdata.length; i++) {
- assertEquals("Wrong result for inputs " + arrayToString(testdata[i][0]) + " and "
- + arrayToString(testdata[i][0]), referenceSolutions[i],
- new CosineDistance().getDelta(testdata[i][0], testdata[i][1]), 0.000001);
- }
- }
-
- private String arrayToString(double[] in){
- if (in.length==0) return "{}";
- String result="{";
- for (double d:in){
- result+=d+",";
- }
- return result.substring(0, result.length()-1)+"}";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
deleted file mode 100644
index 85a0882..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.deltafunction;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.streaming.api.functions.windowing.delta.EuclideanDistance;
-import org.junit.Test;
-
-public class EuclideanDistanceTest {
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testEuclideanDistance() {
-
- //Reference calculated using wolfram alpha
- double[][][] testdata={
- {{0,0,0},{0,0,0}},
- {{0,0,0},{1,2,3}},
- {{1,2,3},{0,0,0}},
- {{1,2,3},{4,5,6}},
- {{1,2,3},{-4,-5,-6}},
- {{1,2,-3},{-4,5,-6}},
- {{1,2,3,4},{5,6,7,8}},
- {{1,2},{3,4}},
- {{1},{2}},
- };
- double[] referenceSolutions={
- 0,
- 3.741657,
- 3.741657,
- 5.196152,
- 12.4499,
- 6.557439,
- 8.0,
- 2.828427,
- 1
- };
-
- for (int i = 0; i < testdata.length; i++) {
- assertEquals("Wrong result for inputs " + arrayToString(testdata[i][0]) + " and "
- + arrayToString(testdata[i][0]), referenceSolutions[i],
- new EuclideanDistance().getDelta(testdata[i][0], testdata[i][1]), 0.000001);
- }
-
- }
-
- private String arrayToString(double[] in){
- if (in.length==0) return "{}";
- String result="{";
- for (double d:in){
- result+=d+",";
- }
- return result.substring(0, result.length()-1)+"}";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
deleted file mode 100644
index 9d9d47b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.graph;
-
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-@SuppressWarnings("serial")
-public class TranslationTest extends StreamingMultipleProgramsTestBase {
-
- @Test
- public void testCheckpointModeTranslation() {
- try {
- // with deactivated fault tolerance, the checkpoint mode should be at-least-once
- StreamExecutionEnvironment deactivated = getSimpleJob();
-
- for (JobVertex vertex : deactivated.getStreamGraph().getJobGraph().getVertices()) {
- assertEquals(CheckpointingMode.AT_LEAST_ONCE, new StreamConfig(vertex.getConfiguration()).getCheckpointMode());
- }
-
- // with activated fault tolerance, the checkpoint mode should be by default exactly once
- StreamExecutionEnvironment activated = getSimpleJob();
- activated.enableCheckpointing(1000L);
- for (JobVertex vertex : activated.getStreamGraph().getJobGraph().getVertices()) {
- assertEquals(CheckpointingMode.EXACTLY_ONCE, new StreamConfig(vertex.getConfiguration()).getCheckpointMode());
- }
-
- // explicitly setting the mode
- StreamExecutionEnvironment explicit = getSimpleJob();
- explicit.enableCheckpointing(1000L, CheckpointingMode.AT_LEAST_ONCE);
- for (JobVertex vertex : explicit.getStreamGraph().getJobGraph().getVertices()) {
- assertEquals(CheckpointingMode.AT_LEAST_ONCE, new StreamConfig(vertex.getConfiguration()).getCheckpointMode());
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- private static StreamExecutionEnvironment getSimpleJob() {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.generateSequence(1, 10000000)
- .addSink(new SinkFunction<Long>() {
- @Override
- public void invoke(Long value) {
- }
- });
-
- return env;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
deleted file mode 100644
index a8a989b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-
-import org.junit.Test;
-
-/**
- * The test generates two random streams (input channels) which independently
- * and randomly generate checkpoint barriers. The two streams are very
- * unaligned, putting heavy work on the BarrierBuffer.
- */
-public class BarrierBufferMassiveRandomTest {
-
- private static final int PAGE_SIZE = 1024;
-
- @Test
- public void testWithTwoChannelsAndRandomBarriers() {
- IOManager ioMan = null;
- try {
- ioMan = new IOManagerAsync();
-
- BufferPool pool1 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100, true);
- BufferPool pool2 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100, true);
-
- RandomGeneratingInputGate myIG = new RandomGeneratingInputGate(
- new BufferPool[] { pool1, pool2 },
- new BarrierGenerator[] { new CountBarrier(100000), new RandomBarrier(100000) });
-
- BarrierBuffer barrierBuffer = new BarrierBuffer(myIG, ioMan);
-
- for (int i = 0; i < 2000000; i++) {
- BufferOrEvent boe = barrierBuffer.getNextNonBlocked();
- if (boe.isBuffer()) {
- boe.getBuffer().recycle();
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- if (ioMan != null) {
- ioMan.shutdown();
- }
- }
- }
-
- // ------------------------------------------------------------------------
- // Mocks and Generators
- // ------------------------------------------------------------------------
-
- protected interface BarrierGenerator {
- public boolean isNextBarrier();
- }
-
- protected static class RandomBarrier implements BarrierGenerator {
-
- private static final Random rnd = new Random();
-
- private final double threshold;
-
- public RandomBarrier(double expectedEvery) {
- threshold = 1 / expectedEvery;
- }
-
- @Override
- public boolean isNextBarrier() {
- return rnd.nextDouble() < threshold;
- }
- }
-
- private static class CountBarrier implements BarrierGenerator {
-
- private final long every;
- private long c = 0;
-
- public CountBarrier(long every) {
- this.every = every;
- }
-
- @Override
- public boolean isNextBarrier() {
- return c++ % every == 0;
- }
- }
-
- protected static class RandomGeneratingInputGate implements InputGate {
-
- private final int numChannels;
- private final BufferPool[] bufferPools;
- private final int[] currentBarriers;
- private final BarrierGenerator[] barrierGens;
- private int currentChannel = 0;
- private long c = 0;
-
- public RandomGeneratingInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) {
- this.numChannels = bufferPools.length;
- this.currentBarriers = new int[numChannels];
- this.bufferPools = bufferPools;
- this.barrierGens = barrierGens;
- }
-
- @Override
- public int getNumberOfInputChannels() {
- return numChannels;
- }
-
- @Override
- public boolean isFinished() {
- return false;
- }
-
- @Override
- public void requestPartitions() {}
-
- @Override
- public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
- currentChannel = (currentChannel + 1) % numChannels;
-
- if (barrierGens[currentChannel].isNextBarrier()) {
- return new BufferOrEvent(
- new CheckpointBarrier(++currentBarriers[currentChannel], System.currentTimeMillis()),
- currentChannel);
- } else {
- Buffer buffer = bufferPools[currentChannel].requestBuffer();
- buffer.getMemorySegment().putLong(0, c++);
- return new BufferOrEvent(buffer, currentChannel);
- }
- }
-
- @Override
- public void sendTaskEvent(TaskEvent event) {}
-
- @Override
- public void registerListener(EventListener<InputGate> listener) {}
-
- @Override
- public int getPageSize() {
- return PAGE_SIZE;
- }
- }
-}