You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:42 UTC
[26/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtractTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtractTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtractTest.java
deleted file mode 100644
index 3b098c3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtractTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ConcatenatedExtractTest {
-
- private String[] testStringArray1 = { "1", "2", "3" };
- private int[] testIntArray1 = { 1, 2, 3 };
- private String[] testStringArray2 = { "4", "5", "6" };
- private int[] testIntArray2 = { 4, 5, 6 };
- private String[] testStringArray3 = { "7", "8", "9" };
- private int[] testIntArray3 = { 7, 8, 9 };
- private Tuple2<String[], int[]>[] testTuple2Array;
- private Tuple2<String[], int[]> testTuple2;
- private Tuple2<Tuple2<String[], int[]>, Tuple2<String[], int[]>[]> testData;
-
- @SuppressWarnings("unchecked")
- @Before
- public void setupData() {
- testTuple2Array = new Tuple2[2];
- testTuple2Array[0] = new Tuple2<String[], int[]>(testStringArray1, testIntArray2);
- testTuple2Array[1] = new Tuple2<String[], int[]>(testStringArray2, testIntArray1);
-
- testTuple2 = new Tuple2<String[], int[]>(testStringArray3, testIntArray3);
-
- testData = new Tuple2<Tuple2<String[], int[]>, Tuple2<String[], int[]>[]>(testTuple2,
- testTuple2Array);
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void test1() {
- Extractor ext = new ConcatenatedExtract(new FieldFromTuple(0), new FieldFromTuple(1))
- .add(new FieldsFromArray(Integer.class, 2, 1, 0));
- int[] expected = { testIntArray3[2], testIntArray3[1], testIntArray3[0] };
- assertEquals(new Integer(expected[0]), ((Integer[]) ext.extract(testData))[0]);
- assertEquals(new Integer(expected[1]), ((Integer[]) ext.extract(testData))[1]);
- assertEquals(new Integer(expected[2]), ((Integer[]) ext.extract(testData))[2]);
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- @Test
- public void test2() {
- Extractor ext = new ConcatenatedExtract(new FieldFromTuple(1), // Tuple2<String[],int[]>[]
- new FieldsFromArray(Tuple2.class, 1)) // Tuple2<String[],int[]>[]
- .add(new FieldFromArray(0)) // Tuple2<String[],int[]>
- .add(new ArrayFromTuple(0)) // Object[] (Containing String[])
- .add(new FieldFromArray(0)) // String[]
- .add(new FieldFromArray(1)); // String
-
- String expected2 = testStringArray2[1];
- assertEquals(expected2, ext.extract(testData));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArrayTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArrayTest.java
deleted file mode 100644
index d274f4e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArrayTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class FieldFromArrayTest {
-
- String[] testStringArray = { "0", "1", "2", "3", "4" };
- Integer[] testIntegerArray = { 10, 11, 12, 13, 14 };
- int[] testIntArray = { 20, 21, 22, 23, 24 };
-
- @Test
- public void testStringArray() {
- for (int i = 0; i < this.testStringArray.length; i++) {
- assertEquals(this.testStringArray[i],
- new FieldFromArray<String>(i).extract(testStringArray));
- }
- }
-
- @Test
- public void testIntegerArray() {
- for (int i = 0; i < this.testIntegerArray.length; i++) {
- assertEquals(this.testIntegerArray[i],
- new FieldFromArray<String>(i).extract(testIntegerArray));
- }
- }
-
- @Test
- public void testIntArray() {
- for (int i = 0; i < this.testIntArray.length; i++) {
- assertEquals(new Integer(this.testIntArray[i]),
- new FieldFromArray<Integer>(i).extract(testIntArray));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTupleTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTupleTest.java
deleted file mode 100644
index c05f281..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTupleTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple10;
-import org.apache.flink.api.java.tuple.Tuple11;
-import org.apache.flink.api.java.tuple.Tuple12;
-import org.apache.flink.api.java.tuple.Tuple13;
-import org.apache.flink.api.java.tuple.Tuple14;
-import org.apache.flink.api.java.tuple.Tuple15;
-import org.apache.flink.api.java.tuple.Tuple16;
-import org.apache.flink.api.java.tuple.Tuple17;
-import org.apache.flink.api.java.tuple.Tuple18;
-import org.apache.flink.api.java.tuple.Tuple19;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple20;
-import org.apache.flink.api.java.tuple.Tuple21;
-import org.apache.flink.api.java.tuple.Tuple22;
-import org.apache.flink.api.java.tuple.Tuple23;
-import org.apache.flink.api.java.tuple.Tuple24;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple6;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.java.tuple.Tuple9;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class FieldFromTupleTest {
-
- private String[] testStrings;
-
- @Before
- public void init() {
- testStrings = new String[Tuple.MAX_ARITY];
- for (int i = 0; i < Tuple.MAX_ARITY; i++) {
- testStrings[i] = Integer.toString(i);
- }
- }
-
- @Test
- public void testSingleFieldExtraction() throws InstantiationException, IllegalAccessException {
- // extract single fields
- for (int i = 0; i < Tuple.MAX_ARITY; i++) {
- Tuple current = (Tuple) CLASSES[i].newInstance();
- for (int j = 0; j < i; j++) {
- current.setField(testStrings[j], j);
- }
- for (int j = 0; j < i; j++) {
- assertEquals(testStrings[j], new FieldFromTuple<String>(j).extract(current));
- }
- }
- }
-
- private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class,
- Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
- Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
- Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
- Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class,
- Tuple24.class, Tuple25.class };
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArrayTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArrayTest.java
deleted file mode 100644
index 7a9a716..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArrayTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.flink.streaming.api.functions.windowing.delta.extractor.FieldsFromArray;
-import org.junit.Test;
-
-public class FieldsFromArrayTest {
-
- String[] testStringArray = { "0", "1", "2", "3", "4" };
- Integer[] testIntegerArray = { 10, 11, 12, 13, 14 };
- int[] testIntArray = { 20, 21, 22, 23, 24 };
-
- @Test
- public void testStringArray() {
- // check single field extraction
- for (int i = 0; i < testStringArray.length; i++) {
- String[] tmp = { testStringArray[i] };
- arrayEqualityCheck(tmp,
- new FieldsFromArray<String>(String.class, i).extract(testStringArray));
- }
-
- // check reverse order
- String[] reverseOrder = new String[testStringArray.length];
- for (int i = 0; i < testStringArray.length; i++) {
- reverseOrder[i] = testStringArray[testStringArray.length - i - 1];
- }
- arrayEqualityCheck(reverseOrder,
- new FieldsFromArray<String>(String.class, 4, 3, 2, 1, 0).extract(testStringArray));
-
- // check picking fields and reorder
- String[] crazyOrder = { testStringArray[4], testStringArray[1], testStringArray[2] };
- arrayEqualityCheck(crazyOrder,
- new FieldsFromArray<String>(String.class, 4, 1, 2).extract(testStringArray));
- }
-
- @Test
- public void testIntegerArray() {
- // check single field extraction
- for (int i = 0; i < testIntegerArray.length; i++) {
- Integer[] tmp = { testIntegerArray[i] };
- arrayEqualityCheck(tmp,
- new FieldsFromArray<Integer>(Integer.class, i).extract(testIntegerArray));
- }
-
- // check reverse order
- Integer[] reverseOrder = new Integer[testIntegerArray.length];
- for (int i = 0; i < testIntegerArray.length; i++) {
- reverseOrder[i] = testIntegerArray[testIntegerArray.length - i - 1];
- }
- arrayEqualityCheck(reverseOrder,
- new FieldsFromArray<Integer>(Integer.class, 4, 3, 2, 1, 0)
- .extract(testIntegerArray));
-
- // check picking fields and reorder
- Integer[] crazyOrder = { testIntegerArray[4], testIntegerArray[1], testIntegerArray[2] };
- arrayEqualityCheck(crazyOrder,
- new FieldsFromArray<Integer>(Integer.class, 4, 1, 2).extract(testIntegerArray));
-
- }
-
- @Test
- public void testIntArray() {
- for (int i = 0; i < testIntArray.length; i++) {
- Integer[] tmp = { testIntArray[i] };
- arrayEqualityCheck(tmp,
- new FieldsFromArray<Integer>(Integer.class, i).extract(testIntArray));
- }
-
- // check reverse order
- Integer[] reverseOrder = new Integer[testIntArray.length];
- for (int i = 0; i < testIntArray.length; i++) {
- reverseOrder[i] = testIntArray[testIntArray.length - i - 1];
- }
- arrayEqualityCheck(reverseOrder,
- new FieldsFromArray<Integer>(Integer.class, 4, 3, 2, 1, 0).extract(testIntArray));
-
- // check picking fields and reorder
- Integer[] crazyOrder = { testIntArray[4], testIntArray[1], testIntArray[2] };
- arrayEqualityCheck(crazyOrder,
- new FieldsFromArray<Integer>(Integer.class, 4, 1, 2).extract(testIntArray));
-
- }
-
- private void arrayEqualityCheck(Object[] array1, Object[] array2) {
- assertEquals("The result arrays must have the same length", array1.length, array2.length);
- for (int i = 0; i < array1.length; i++) {
- assertEquals("Unequal fields at position " + i, array1[i], array2[i]);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTupleTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTupleTest.java
deleted file mode 100644
index 025ed8a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTupleTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple10;
-import org.apache.flink.api.java.tuple.Tuple11;
-import org.apache.flink.api.java.tuple.Tuple12;
-import org.apache.flink.api.java.tuple.Tuple13;
-import org.apache.flink.api.java.tuple.Tuple14;
-import org.apache.flink.api.java.tuple.Tuple15;
-import org.apache.flink.api.java.tuple.Tuple16;
-import org.apache.flink.api.java.tuple.Tuple17;
-import org.apache.flink.api.java.tuple.Tuple18;
-import org.apache.flink.api.java.tuple.Tuple19;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple20;
-import org.apache.flink.api.java.tuple.Tuple21;
-import org.apache.flink.api.java.tuple.Tuple22;
-import org.apache.flink.api.java.tuple.Tuple23;
-import org.apache.flink.api.java.tuple.Tuple24;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple6;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.java.tuple.Tuple9;
-import org.apache.flink.streaming.api.functions.windowing.delta.extractor.FieldsFromTuple;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FieldsFromTupleTest {
-
- private double[] testDouble;
-
- @Before
- public void init() {
- testDouble = new double[Tuple.MAX_ARITY];
- for (int i = 0; i < Tuple.MAX_ARITY; i++) {
- testDouble[i] = i;
- }
- }
-
- @Test
- public void testUserSpecifiedOrder() throws InstantiationException, IllegalAccessException {
- Tuple currentTuple = (Tuple) CLASSES[Tuple.MAX_ARITY - 1].newInstance();
- for (int i = 0; i < Tuple.MAX_ARITY; i++) {
- currentTuple.setField(testDouble[i], i);
- }
-
- double[] expected = { testDouble[5], testDouble[3], testDouble[6], testDouble[7],
- testDouble[0] };
- arrayEqualityCheck(expected, new FieldsFromTuple(5, 3, 6, 7, 0).extract(currentTuple));
-
- double[] expected2 = { testDouble[0], testDouble[Tuple.MAX_ARITY - 1] };
- arrayEqualityCheck(expected2,
- new FieldsFromTuple(0, Tuple.MAX_ARITY - 1).extract(currentTuple));
-
- double[] expected3 = { testDouble[Tuple.MAX_ARITY - 1], testDouble[0] };
- arrayEqualityCheck(expected3,
- new FieldsFromTuple(Tuple.MAX_ARITY - 1, 0).extract(currentTuple));
-
- double[] expected4 = { testDouble[13], testDouble[4], testDouble[5], testDouble[4],
- testDouble[2], testDouble[8], testDouble[6], testDouble[2], testDouble[8],
- testDouble[3], testDouble[5], testDouble[2], testDouble[16], testDouble[4],
- testDouble[3], testDouble[2], testDouble[6], testDouble[4], testDouble[7],
- testDouble[4], testDouble[2], testDouble[8], testDouble[7], testDouble[2] };
- arrayEqualityCheck(expected4, new FieldsFromTuple(13, 4, 5, 4, 2, 8, 6, 2, 8, 3, 5, 2, 16,
- 4, 3, 2, 6, 4, 7, 4, 2, 8, 7, 2).extract(currentTuple));
- }
-
- private void arrayEqualityCheck(double[] array1, double[] array2) {
- assertEquals("The result arrays must have the same length", array1.length, array2.length);
- for (int i = 0; i < array1.length; i++) {
- assertEquals("Unequal fields at position " + i, array1[i], array2[i], 0d);
- }
- }
-
- private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class,
- Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
- Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
- Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
- Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class,
- Tuple24.class, Tuple25.class };
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
deleted file mode 100644
index 8038cfb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.graph;
-
-import static org.junit.Assert.*;
-
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class SlotAllocationTest {
-
- @Test
- public void test() {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- FilterFunction<Long> dummyFilter = new FilterFunction<Long>() {
- @Override
- public boolean filter(Long value) { return false; }
- };
-
- env.generateSequence(1, 10).filter(dummyFilter).isolateResources().filter(dummyFilter)
- .disableChaining().filter(dummyFilter).startNewResourceGroup().filter(dummyFilter)
- .startNewChain().print();
-
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-
- List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
-
- assertEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(2).getSlotSharingGroup());
- assertNotEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(1).getSlotSharingGroup());
- assertNotEquals(vertices.get(2).getSlotSharingGroup(), vertices.get(3).getSlotSharingGroup());
- assertEquals(vertices.get(3).getSlotSharingGroup(), vertices.get(4).getSlotSharingGroup());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
deleted file mode 100644
index c316604..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.graph;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.ConnectedStreams;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
-import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.util.EvenOddOutputSelector;
-import org.apache.flink.streaming.util.NoOpIntMap;
-import org.apache.flink.streaming.util.NoOpSink;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for {@link StreamGraphGenerator}. This only tests correct translation of split/select,
- * union, partitioning since the other translation routines are tested already in operation
- * specific tests, for example in {@link org.apache.flink.streaming.api.IterateTest} for
- * iterations.
- */
-public class StreamGraphGeneratorTest extends StreamingMultipleProgramsTestBase {
-
- /**
- * This tests whether virtual Transformations behave correctly.
- *
- * <p>
- * Verifies that partitioning, output selector, selected names are correctly set in the
- * StreamGraph when they are intermixed.
- */
- @Test
- public void testVirtualTransformations() throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<Integer> source = env.fromElements(1, 10);
-
- DataStream<Integer> rebalanceMap = source.rebalance().map(new NoOpIntMap());
-
- // verify that only the partitioning that was set last is used
- DataStream<Integer> broadcastMap = rebalanceMap
- .forward()
- .global()
- .broadcast()
- .map(new NoOpIntMap());
-
- broadcastMap.addSink(new NoOpSink<Integer>());
-
- // verify that partitioning is preserved across union and split/select
- EvenOddOutputSelector selector1 = new EvenOddOutputSelector();
- EvenOddOutputSelector selector2 = new EvenOddOutputSelector();
- EvenOddOutputSelector selector3 = new EvenOddOutputSelector();
-
- DataStream<Integer> map1Operator = rebalanceMap
- .map(new NoOpIntMap());
-
- DataStream<Integer> map1 = map1Operator
- .broadcast()
- .split(selector1)
- .select("even");
-
- DataStream<Integer> map2Operator = rebalanceMap
- .map(new NoOpIntMap());
-
- DataStream<Integer> map2 = map2Operator
- .split(selector2)
- .select("odd")
- .global();
-
- DataStream<Integer> map3Operator = rebalanceMap
- .map(new NoOpIntMap());
-
- DataStream<Integer> map3 = map3Operator
- .global()
- .split(selector3)
- .select("even")
- .shuffle();
-
-
- SingleOutputStreamOperator<Integer, ?> unionedMap = map1.union(map2).union(map3)
- .map(new NoOpIntMap());
-
- unionedMap.addSink(new NoOpSink<Integer>());
-
- StreamGraph graph = env.getStreamGraph();
-
- // rebalanceMap
- assertTrue(graph.getStreamNode(rebalanceMap.getId()).getInEdges().get(0).getPartitioner() instanceof RebalancePartitioner);
-
- // verify that only last partitioning takes precedence
- assertTrue(graph.getStreamNode(broadcastMap.getId()).getInEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
- assertEquals(rebalanceMap.getId(), graph.getStreamNode(broadcastMap.getId()).getInEdges().get(0).getSourceVertex().getId());
-
- // verify that partitioning in unions is preserved and that it works across split/select
- assertTrue(graph.getStreamNode(map1Operator.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
- assertTrue(graph.getStreamNode(map1Operator.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("even"));
- assertTrue(graph.getStreamNode(map1Operator.getId()).getOutputSelectors().contains(selector1));
-
- assertTrue(graph.getStreamNode(map2Operator.getId()).getOutEdges().get(0).getPartitioner() instanceof GlobalPartitioner);
- assertTrue(graph.getStreamNode(map2Operator.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("odd"));
- assertTrue(graph.getStreamNode(map2Operator.getId()).getOutputSelectors().contains(selector2));
-
- assertTrue(graph.getStreamNode(map3Operator.getId()).getOutEdges().get(0).getPartitioner() instanceof ShufflePartitioner);
- assertTrue(graph.getStreamNode(map3Operator.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("even"));
- assertTrue(graph.getStreamNode(map3Operator.getId()).getOutputSelectors().contains(selector3));
- }
-
- /**
- * This tests whether virtual Transformations behave correctly.
- *
- * Checks whether output selector, partitioning works correctly when applied on a union.
- */
- @Test
- public void testVirtualTransformations2() throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<Integer> source = env.fromElements(1, 10);
-
- DataStream<Integer> rebalanceMap = source.rebalance().map(new NoOpIntMap());
-
- DataStream<Integer> map1 = rebalanceMap
- .map(new NoOpIntMap());
-
- DataStream<Integer> map2 = rebalanceMap
- .map(new NoOpIntMap());
-
- DataStream<Integer> map3 = rebalanceMap
- .map(new NoOpIntMap());
-
- EvenOddOutputSelector selector = new EvenOddOutputSelector();
-
- SingleOutputStreamOperator<Integer, ?> unionedMap = map1.union(map2).union(map3)
- .broadcast()
- .split(selector)
- .select("foo")
- .map(new NoOpIntMap());
-
- unionedMap.addSink(new NoOpSink<Integer>());
-
- StreamGraph graph = env.getStreamGraph();
-
- // verify that the properties are correctly set on all input operators
- assertTrue(graph.getStreamNode(map1.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
- assertTrue(graph.getStreamNode(map1.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("foo"));
- assertTrue(graph.getStreamNode(map1.getId()).getOutputSelectors().contains(selector));
-
- assertTrue(graph.getStreamNode(map2.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
- assertTrue(graph.getStreamNode(map2.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("foo"));
- assertTrue(graph.getStreamNode(map2.getId()).getOutputSelectors().contains(selector));
-
- assertTrue(graph.getStreamNode(map3.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
- assertTrue(graph.getStreamNode(map3.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("foo"));
- assertTrue(graph.getStreamNode(map3.getId()).getOutputSelectors().contains(selector));
-
- }
-
- /**
- * Test whether an {@link OutputTypeConfigurable} implementation gets called with the correct
- * output type. In this test case the output type must be BasicTypeInfo.INT_TYPE_INFO.
- *
- * @throws Exception
- */
- @Test
- public void testOutputTypeConfigurationWithOneInputTransformation() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<Integer> source = env.fromElements(1, 10);
-
- OutputTypeConfigurableOperationWithOneInput outputTypeConfigurableOperation = new OutputTypeConfigurableOperationWithOneInput();
-
- DataStream<Integer> result = source.transform(
- "Single input and output type configurable operation",
- BasicTypeInfo.INT_TYPE_INFO,
- outputTypeConfigurableOperation);
-
- result.addSink(new NoOpSink<Integer>());
-
- StreamGraph graph = env.getStreamGraph();
-
- assertEquals(BasicTypeInfo.INT_TYPE_INFO, outputTypeConfigurableOperation.getTypeInformation());
- }
-
- @Test
- public void testOutputTypeConfigurationWithTwoInputTransformation() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<Integer> source1 = env.fromElements(1, 10);
- DataStream<Integer> source2 = env.fromElements(2, 11);
-
- ConnectedStreams<Integer, Integer> connectedSource = source1.connect(source2);
-
- OutputTypeConfigurableOperationWithTwoInputs outputTypeConfigurableOperation = new OutputTypeConfigurableOperationWithTwoInputs();
-
- DataStream<Integer> result = connectedSource.transform(
- "Two input and output type configurable operation",
- BasicTypeInfo.INT_TYPE_INFO,
- outputTypeConfigurableOperation);
-
- result.addSink(new NoOpSink<Integer>());
-
- StreamGraph graph = env.getStreamGraph();
-
- assertEquals(BasicTypeInfo.INT_TYPE_INFO, outputTypeConfigurableOperation.getTypeInformation());
- }
-
- private static class OutputTypeConfigurableOperationWithTwoInputs
- extends AbstractStreamOperator<Integer>
- implements TwoInputStreamOperator<Integer, Integer, Integer>, OutputTypeConfigurable<Integer> {
-
- TypeInformation<Integer> tpeInformation;
-
- public TypeInformation<Integer> getTypeInformation() {
- return tpeInformation;
- }
-
- @Override
- public void setOutputType(TypeInformation<Integer> outTypeInfo, ExecutionConfig executionConfig) {
- tpeInformation = outTypeInfo;
- }
-
- @Override
- public void processElement1(StreamRecord element) throws Exception {
- output.collect(element);
- }
-
- @Override
- public void processElement2(StreamRecord element) throws Exception {
- output.collect(element);
- }
-
- @Override
- public void processWatermark1(Watermark mark) throws Exception {}
-
- @Override
- public void processWatermark2(Watermark mark) throws Exception {}
-
- @Override
- public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Integer>> output) {}
- }
-
- private static class OutputTypeConfigurableOperationWithOneInput
- extends AbstractStreamOperator<Integer>
- implements OneInputStreamOperator<Integer, Integer>, OutputTypeConfigurable<Integer> {
-
- TypeInformation<Integer> tpeInformation;
-
- public TypeInformation<Integer> getTypeInformation() {
- return tpeInformation;
- }
-
- @Override
- public void processElement(StreamRecord<Integer> element) throws Exception {
- output.collect(element);
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
-
- }
-
- @Override
- public void setOutputType(TypeInformation<Integer> outTypeInfo, ExecutionConfig executionConfig) {
- tpeInformation = outTypeInfo;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
deleted file mode 100644
index e806428..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.graph;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.InstantiationUtil;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamingJobGraphGeneratorTest extends StreamingMultipleProgramsTestBase {
- private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGeneratorTest.class);
-
- @Test
- public void testExecutionConfigSerialization() throws IOException, ClassNotFoundException {
- final long seed = System.currentTimeMillis();
- LOG.info("Test seed: {}", new Long(seed));
- final Random r = new Random(seed);
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- StreamGraph streamingJob = new StreamGraph(env);
- StreamingJobGraphGenerator compiler = new StreamingJobGraphGenerator(streamingJob);
-
- boolean closureCleanerEnabled = r.nextBoolean(), forceAvroEnabled = r.nextBoolean(), forceKryoEnabled = r.nextBoolean(), objectReuseEnabled = r.nextBoolean(), sysoutLoggingEnabled = r.nextBoolean();
- int dop = 1 + r.nextInt(10);
-
- ExecutionConfig config = streamingJob.getExecutionConfig();
- if(closureCleanerEnabled) {
- config.enableClosureCleaner();
- } else {
- config.disableClosureCleaner();
- }
- if(forceAvroEnabled) {
- config.enableForceAvro();
- } else {
- config.disableForceAvro();
- }
- if(forceKryoEnabled) {
- config.enableForceKryo();
- } else {
- config.disableForceKryo();
- }
- if(objectReuseEnabled) {
- config.enableObjectReuse();
- } else {
- config.disableObjectReuse();
- }
- if(sysoutLoggingEnabled) {
- config.enableSysoutLogging();
- } else {
- config.disableSysoutLogging();
- }
- config.setParallelism(dop);
-
- JobGraph jobGraph = compiler.createJobGraph("test");
- ExecutionConfig executionConfig = (ExecutionConfig) InstantiationUtil.readObjectFromConfig(
- jobGraph.getJobConfiguration(),
- ExecutionConfig.CONFIG_KEY,
- Thread.currentThread().getContextClassLoader());
-
- Assert.assertEquals(closureCleanerEnabled, executionConfig.isClosureCleanerEnabled());
- Assert.assertEquals(forceAvroEnabled, executionConfig.isForceAvroEnabled());
- Assert.assertEquals(forceKryoEnabled, executionConfig.isForceKryoEnabled());
- Assert.assertEquals(objectReuseEnabled, executionConfig.isObjectReuseEnabled());
- Assert.assertEquals(sysoutLoggingEnabled, executionConfig.isSysoutLoggingEnabled());
- Assert.assertEquals(dop, executionConfig.getParallelism());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java
deleted file mode 100644
index dc8024c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.junit.Test;
-
-/**
- * Tests for {@link StreamCounter}. These test that:
- *
- * <ul>
- * <li>Timestamps of processed elements match the input timestamp</li>
- * <li>Watermarks are correctly forwarded</li>
- * </ul>
- */
-public class StreamCounterTest {
-
- @Test
- public void testCount() throws Exception {
- StreamCounter<String> operator = new StreamCounter<String>();
-
- OneInputStreamOperatorTestHarness<String, Long> testHarness = new OneInputStreamOperatorTestHarness<String, Long>(operator);
-
- long initialTime = 0L;
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
-
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<String>("eins", initialTime + 1));
- testHarness.processElement(new StreamRecord<String>("zwei", initialTime + 2));
- testHarness.processWatermark(new Watermark(initialTime + 2));
- testHarness.processElement(new StreamRecord<String>("drei", initialTime + 3));
-
- expectedOutput.add(new StreamRecord<Long>(1L, initialTime + 1));
- expectedOutput.add(new StreamRecord<Long>(2L, initialTime + 2));
- expectedOutput.add(new Watermark(initialTime + 2));
- expectedOutput.add(new StreamRecord<Long>(3L, initialTime + 3));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java
deleted file mode 100644
index 047aad8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.RichFilterFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Tests for {@link StreamFilter}. These test that:
- *
- * <ul>
- * <li>RichFunction methods are called correctly</li>
- * <li>Timestamps of processed elements match the input timestamp</li>
- * <li>Watermarks are correctly forwarded</li>
- * </ul>
- */
-public class StreamFilterTest {
-
- static class MyFilter implements FilterFunction<Integer> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean filter(Integer value) throws Exception {
- return value % 2 == 0;
- }
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testFilter() throws Exception {
- StreamFilter<Integer> operator = new StreamFilter<Integer>(new MyFilter());
-
- OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);
-
- long initialTime = 0L;
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
-
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));
- testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 2));
- testHarness.processWatermark(new Watermark(initialTime + 2));
- testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 3));
- testHarness.processElement(new StreamRecord<Integer>(4, initialTime + 4));
- testHarness.processElement(new StreamRecord<Integer>(5, initialTime + 5));
- testHarness.processElement(new StreamRecord<Integer>(6, initialTime + 6));
- testHarness.processElement(new StreamRecord<Integer>(7, initialTime + 7));
-
- expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 2));
- expectedOutput.add(new Watermark(initialTime + 2));
- expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 4));
- expectedOutput.add(new StreamRecord<Integer>(6, initialTime + 6));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
- }
-
- @Test
- public void testOpenClose() throws Exception {
- StreamFilter<String> operator = new StreamFilter<String>(new TestOpenCloseFilterFunction());
-
- OneInputStreamOperatorTestHarness<String, String> testHarness = new OneInputStreamOperatorTestHarness<String, String>(operator);
-
- long initialTime = 0L;
-
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<String>("fooHello", initialTime));
- testHarness.processElement(new StreamRecord<String>("bar", initialTime));
-
- testHarness.close();
-
- Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseFilterFunction.closeCalled);
- Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);
- }
-
- // This must only be used in one test, otherwise the static fields will be changed
- // by several tests concurrently
- private static class TestOpenCloseFilterFunction extends RichFilterFunction<String> {
- private static final long serialVersionUID = 1L;
-
- public static boolean openCalled = false;
- public static boolean closeCalled = false;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- if (closeCalled) {
- Assert.fail("Close called before open.");
- }
- openCalled = true;
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- if (!openCalled) {
- Assert.fail("Open was not called before close.");
- }
- closeCalled = true;
- }
-
- @Override
- public boolean filter(String value) throws Exception {
- if (!openCalled) {
- Assert.fail("Open was not called before run.");
- }
- return value.startsWith("foo");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java
deleted file mode 100644
index e4e29c1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Tests for {@link StreamMap}. These test that:
- *
- * <ul>
- * <li>RichFunction methods are called correctly</li>
- * <li>Timestamps of processed elements match the input timestamp</li>
- * <li>Watermarks are correctly forwarded</li>
- * </ul>
- */
-public class StreamFlatMapTest {
-
- public static final class MyFlatMap implements FlatMapFunction<Integer, Integer> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap(Integer value, Collector<Integer> out) throws Exception {
- if (value % 2 == 0) {
- out.collect(value);
- out.collect(value * value);
- }
- }
- }
-
- @Test
- public void testFlatMap() throws Exception {
- StreamFlatMap<Integer, Integer> operator = new StreamFlatMap<Integer, Integer>(new MyFlatMap());
-
- OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);
-
- long initialTime = 0L;
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
-
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));
- testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 2));
- testHarness.processWatermark(new Watermark(initialTime + 2));
- testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 3));
- testHarness.processElement(new StreamRecord<Integer>(4, initialTime + 4));
- testHarness.processElement(new StreamRecord<Integer>(5, initialTime + 5));
- testHarness.processElement(new StreamRecord<Integer>(6, initialTime + 6));
- testHarness.processElement(new StreamRecord<Integer>(7, initialTime + 7));
- testHarness.processElement(new StreamRecord<Integer>(8, initialTime + 8));
-
- expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 2));
- expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 2));
- expectedOutput.add(new Watermark(initialTime + 2));
- expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 4));
- expectedOutput.add(new StreamRecord<Integer>(16, initialTime + 4));
- expectedOutput.add(new StreamRecord<Integer>(6, initialTime + 6));
- expectedOutput.add(new StreamRecord<Integer>(36, initialTime + 6));
- expectedOutput.add(new StreamRecord<Integer>(8, initialTime + 8));
- expectedOutput.add(new StreamRecord<Integer>(64, initialTime + 8));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
- }
-
- @Test
- public void testOpenClose() throws Exception {
- StreamFlatMap<String, String> operator = new StreamFlatMap<String, String>(new TestOpenCloseFlatMapFunction());
-
- OneInputStreamOperatorTestHarness<String, String> testHarness = new OneInputStreamOperatorTestHarness<String, String>(operator);
-
- long initialTime = 0L;
-
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<String>("Hello", initialTime));
-
- testHarness.close();
-
- Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseFlatMapFunction.closeCalled);
- Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);
- }
-
- // This must only be used in one test, otherwise the static fields will be changed
- // by several tests concurrently
- private static class TestOpenCloseFlatMapFunction extends RichFlatMapFunction<String, String> {
- private static final long serialVersionUID = 1L;
-
- public static boolean openCalled = false;
- public static boolean closeCalled = false;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- if (closeCalled) {
- Assert.fail("Close called before open.");
- }
- openCalled = true;
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- if (!openCalled) {
- Assert.fail("Open was not called before close.");
- }
- closeCalled = true;
- }
-
- @Override
- public void flatMap(String value, Collector<String> out) throws Exception {
- if (!openCalled) {
- Assert.fail("Open was not called before run.");
- }
- out.collect(value);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
deleted file mode 100644
index f6e7e6b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.RichFoldFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for {@link StreamGroupedFold}. These test that:
- *
- * <ul>
- * <li>RichFunction methods are called correctly</li>
- * <li>Timestamps of processed elements match the input timestamp</li>
- * <li>Watermarks are correctly forwarded</li>
- * </ul>
- */
-@SuppressWarnings("serial")
-public class StreamGroupedFoldTest {
-
- private static class MyFolder implements FoldFunction<Integer, String> {
-
- @Override
- public String fold(String accumulator, Integer value) throws Exception {
- return accumulator + value.toString();
- }
- }
-
- @Test
- public void testGroupedFold() throws Exception {
-
- KeySelector<Integer, String> keySelector = new KeySelector<Integer, String>() {
-
- @Override
- public String getKey(Integer value) {
- return value.toString();
- }
- };
-
- StreamGroupedFold<Integer, String, String> operator = new StreamGroupedFold<>(new MyFolder(), "100");
- operator.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
-
- OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
- testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.STRING_TYPE_INFO);
-
- long initialTime = 0L;
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<>(1, initialTime + 1));
- testHarness.processElement(new StreamRecord<>(1, initialTime + 2));
- testHarness.processWatermark(new Watermark(initialTime + 2));
- testHarness.processElement(new StreamRecord<>(2, initialTime + 3));
- testHarness.processElement(new StreamRecord<>(2, initialTime + 4));
- testHarness.processElement(new StreamRecord<>(3, initialTime + 5));
-
- expectedOutput.add(new StreamRecord<>("1001", initialTime + 1));
- expectedOutput.add(new StreamRecord<>("10011", initialTime + 2));
- expectedOutput.add(new Watermark(initialTime + 2));
- expectedOutput.add(new StreamRecord<>("1002", initialTime + 3));
- expectedOutput.add(new StreamRecord<>("10022", initialTime + 4));
- expectedOutput.add(new StreamRecord<>("1003", initialTime + 5));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
- }
-
- @Test
- public void testOpenClose() throws Exception {
- KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() {
- @Override
- public Integer getKey(Integer value) {
- return value;
- }
- };
-
- StreamGroupedFold<Integer, String, Integer> operator = new StreamGroupedFold<>(
- new TestOpenCloseFoldFunction(), "init");
- operator.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
-
- OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
- testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
-
-
- long initialTime = 0L;
-
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<>(1, initialTime));
- testHarness.processElement(new StreamRecord<>(2, initialTime));
-
- testHarness.close();
-
- assertTrue("RichFunction methods where not called.", TestOpenCloseFoldFunction.closeCalled);
- assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);
- }
-
- // This must only be used in one test, otherwise the static fields will be changed
- // by several tests concurrently
- private static class TestOpenCloseFoldFunction extends RichFoldFunction<Integer, String> {
- private static final long serialVersionUID = 1L;
-
- public static boolean openCalled = false;
- public static boolean closeCalled = false;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- if (closeCalled) {
- fail("Close called before open.");
- }
- openCalled = true;
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- if (!openCalled) {
- fail("Open was not called before close.");
- }
- closeCalled = true;
- }
-
- @Override
- public String fold(String acc, Integer in) throws Exception {
- if (!openCalled) {
- fail("Open was not called before run.");
- }
- return acc + in;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
deleted file mode 100644
index 6cb46c9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Tests for {@link StreamGroupedReduce}. These test that:
- *
- * <ul>
- * <li>RichFunction methods are called correctly</li>
- * <li>Timestamps of processed elements match the input timestamp</li>
- * <li>Watermarks are correctly forwarded</li>
- * </ul>
- */
-
-public class StreamGroupedReduceTest {
-
- @Test
- public void testGroupedReduce() throws Exception {
-
- KeySelector<Integer, Integer> keySelector = new IntegerKeySelector();
-
- StreamGroupedReduce<Integer> operator = new StreamGroupedReduce<>(new MyReducer(), IntSerializer.INSTANCE);
-
- OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
- testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
-
- long initialTime = 0L;
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<>(1, initialTime + 1));
- testHarness.processElement(new StreamRecord<>(1, initialTime + 2));
- testHarness.processWatermark(new Watermark(initialTime + 2));
- testHarness.processElement(new StreamRecord<>(2, initialTime + 3));
- testHarness.processElement(new StreamRecord<>(2, initialTime + 4));
- testHarness.processElement(new StreamRecord<>(3, initialTime + 5));
-
- expectedOutput.add(new StreamRecord<>(1, initialTime + 1));
- expectedOutput.add(new StreamRecord<>(2, initialTime + 2));
- expectedOutput.add(new Watermark(initialTime + 2));
- expectedOutput.add(new StreamRecord<>(2, initialTime + 3));
- expectedOutput.add(new StreamRecord<>(4, initialTime + 4));
- expectedOutput.add(new StreamRecord<>(3, initialTime + 5));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
- }
-
- @Test
- public void testOpenClose() throws Exception {
-
- KeySelector<Integer, Integer> keySelector = new IntegerKeySelector();
-
- StreamGroupedReduce<Integer> operator =
- new StreamGroupedReduce<>(new TestOpenCloseReduceFunction(), IntSerializer.INSTANCE);
- OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
- testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
-
- long initialTime = 0L;
-
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<>(1, initialTime));
- testHarness.processElement(new StreamRecord<>(2, initialTime));
-
- testHarness.close();
-
- Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseReduceFunction.closeCalled);
- Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);
- }
-
- // This must only be used in one test, otherwise the static fields will be changed
- // by several tests concurrently
- private static class TestOpenCloseReduceFunction extends RichReduceFunction<Integer> {
- private static final long serialVersionUID = 1L;
-
- public static boolean openCalled = false;
- public static boolean closeCalled = false;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- if (closeCalled) {
- Assert.fail("Close called before open.");
- }
- openCalled = true;
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- if (!openCalled) {
- Assert.fail("Open was not called before close.");
- }
- closeCalled = true;
- }
-
- @Override
- public Integer reduce(Integer in1, Integer in2) throws Exception {
- if (!openCalled) {
- Assert.fail("Open was not called before run.");
- }
- return in1 + in2;
- }
- }
-
- // Utilities
-
- private static class MyReducer implements ReduceFunction<Integer> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer reduce(Integer value1, Integer value2) throws Exception {
- return value1 + value2;
- }
-
- }
-
- private static class IntegerKeySelector implements KeySelector<Integer, Integer> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer getKey(Integer value) throws Exception {
- return value;
- }
- }
-
- private static TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
deleted file mode 100644
index f0113d1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Tests for {@link StreamMap}. These test that:
- *
- * <ul>
- * <li>RichFunction methods are called correctly</li>
- * <li>Timestamps of processed elements match the input timestamp</li>
- * <li>Watermarks are correctly forwarded</li>
- * </ul>
- */
-public class StreamMapTest {
-
- private static class Map implements MapFunction<Integer, String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map(Integer value) throws Exception {
- return "+" + (value + 1);
- }
- }
-
- @Test
- public void testMap() throws Exception {
- StreamMap<Integer, String> operator = new StreamMap<Integer, String>(new Map());
-
- OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<Integer, String>(operator);
-
- long initialTime = 0L;
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
-
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));
- testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 2));
- testHarness.processWatermark(new Watermark(initialTime + 2));
- testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 3));
-
- expectedOutput.add(new StreamRecord<String>("+2", initialTime + 1));
- expectedOutput.add(new StreamRecord<String>("+3", initialTime + 2));
- expectedOutput.add(new Watermark(initialTime + 2));
- expectedOutput.add(new StreamRecord<String>("+4", initialTime + 3));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
- }
-
- @Test
- public void testOpenClose() throws Exception {
- StreamMap<String, String> operator = new StreamMap<String, String>(new TestOpenCloseMapFunction());
-
- OneInputStreamOperatorTestHarness<String, String> testHarness = new OneInputStreamOperatorTestHarness<String, String>(operator);
-
- long initialTime = 0L;
-
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<String>("Hello", initialTime));
-
- testHarness.close();
-
- Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseMapFunction.closeCalled);
- Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);
- }
-
- // This must only be used in one test, otherwise the static fields will be changed
- // by several tests concurrently
- private static class TestOpenCloseMapFunction extends RichMapFunction<String, String> {
- private static final long serialVersionUID = 1L;
-
- public static boolean openCalled = false;
- public static boolean closeCalled = false;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- if (closeCalled) {
- Assert.fail("Close called before open.");
- }
- openCalled = true;
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- if (!openCalled) {
- Assert.fail("Open was not called before close.");
- }
- closeCalled = true;
- }
-
- @Override
- public String map(String value) throws Exception {
- if (!openCalled) {
- Assert.fail("Open was not called before run.");
- }
- return value;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
deleted file mode 100644
index 14abd18..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.util.HashSet;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-import org.apache.flink.streaming.api.datastream.StreamProjection;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-
-import org.junit.Test;
-
-/**
- * Tests for {@link StreamProject}. These test that:
- *
- * <ul>
- * <li>Timestamps of processed elements match the input timestamp</li>
- * <li>Watermarks are correctly forwarded</li>
- * </ul>
- */
-public class StreamProjectTest extends StreamingMultipleProgramsTestBase {
-
- @Test
- public void testProject() throws Exception {
-
- TypeInformation<Tuple5<Integer, String, Integer, String, Integer>> inType = TypeExtractor
- .getForObject(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4));
-
- int[] fields = new int[]{4, 4, 3};
-
- TupleSerializer<Tuple3<Integer, Integer, String>> serializer =
- new TupleTypeInfo<Tuple3<Integer, Integer, String>>(StreamProjection.extractFieldTypes(fields, inType))
- .createSerializer(new ExecutionConfig());
- @SuppressWarnings("unchecked")
- StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> operator =
- new StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(
- fields, serializer);
-
- OneInputStreamOperatorTestHarness<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> testHarness = new OneInputStreamOperatorTestHarness<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(operator);
-
- long initialTime = 0L;
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
-
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<Tuple5<Integer, String, Integer, String, Integer>>(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4), initialTime + 1));
- testHarness.processElement(new StreamRecord<Tuple5<Integer, String, Integer, String, Integer>>(new Tuple5<Integer, String, Integer, String, Integer>(2, "s", 3, "c", 2), initialTime + 2));
- testHarness.processElement(new StreamRecord<Tuple5<Integer, String, Integer, String, Integer>>(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "c", 2), initialTime + 3));
- testHarness.processWatermark(new Watermark(initialTime + 2));
- testHarness.processElement(new StreamRecord<Tuple5<Integer, String, Integer, String, Integer>>(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "a", 7), initialTime + 4));
-
- expectedOutput.add(new StreamRecord<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(4, 4, "b"), initialTime + 1));
- expectedOutput.add(new StreamRecord<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(2, 2, "c"), initialTime + 2));
- expectedOutput.add(new StreamRecord<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(2, 2, "c"), initialTime + 3));
- expectedOutput.add(new Watermark(initialTime + 2));
- expectedOutput.add(new StreamRecord<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(7, 7, "a"), initialTime + 4));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
- }
-
-
- // tests using projection from the API without explicitly specifying the types
- private static HashSet<Tuple2<Long, Double>> expected = new HashSet<Tuple2<Long, Double>>();
- private static HashSet<Tuple2<Long, Double>> actual = new HashSet<Tuple2<Long, Double>>();
-
- @Test
- public void APIWithoutTypesTest() {
-
- for (Long i = 1L; i < 11L; i++) {
- expected.add(new Tuple2<Long, Double>(i, i.doubleValue()));
- }
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- env.generateSequence(1, 10).map(new MapFunction<Long, Tuple3<Long, Character, Double>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple3<Long, Character, Double> map(Long value) throws Exception {
- return new Tuple3<Long, Character, Double>(value, 'c', value.doubleValue());
- }
- })
- .project(0, 2)
- .addSink(new SinkFunction<Tuple>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- @SuppressWarnings("unchecked")
- public void invoke(Tuple value) throws Exception {
- actual.add( (Tuple2<Long,Double>) value);
- }
- });
-
- try {
- env.execute();
- } catch (Exception e) {
- fail(e.getMessage());
- }
-
- assertEquals(expected, actual);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoGroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoGroupedReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoGroupedReduceTest.java
deleted file mode 100644
index 39e85e9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoGroupedReduceTest.java
+++ /dev/null
@@ -1,125 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements. See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.flink.streaming.api.operators.co;
-//
-//import static org.junit.Assert.assertEquals;
-//
-//import java.util.Arrays;
-//import java.util.List;
-//
-//import org.apache.flink.api.java.functions.KeySelector;
-//import org.apache.flink.api.java.tuple.Tuple2;
-//import org.apache.flink.api.java.tuple.Tuple3;
-//import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
-//import org.apache.flink.streaming.api.operators.co.CoStreamGroupedReduce;
-//import org.apache.flink.streaming.util.MockCoContext;
-//import org.junit.Test;
-//
-//public class CoGroupedReduceTest {
-//
-// private final static class MyCoReduceFunction implements
-// CoReduceFunction<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> {
-// private static final long serialVersionUID = 1L;
-//
-// @Override
-// public Tuple3<String, String, String> reduce1(Tuple3<String, String, String> value1,
-// Tuple3<String, String, String> value2) {
-// return new Tuple3<String, String, String>(value1.f0, value1.f1 + value2.f1, value1.f2);
-// }
-//
-// @Override
-// public Tuple2<Integer, Integer> reduce2(Tuple2<Integer, Integer> value1,
-// Tuple2<Integer, Integer> value2) {
-// return new Tuple2<Integer, Integer>(value1.f0, value1.f1 + value2.f1);
-// }
-//
-// @Override
-// public String map1(Tuple3<String, String, String> value) {
-// return value.f1;
-// }
-//
-// @Override
-// public String map2(Tuple2<Integer, Integer> value) {
-// return value.f1.toString();
-// }
-// }
-//
-// @SuppressWarnings("unchecked")
-// @Test
-// public void coGroupedReduceTest() {
-// Tuple3<String, String, String> word1 = new Tuple3<String, String, String>("a", "word1", "b");
-// Tuple3<String, String, String> word2 = new Tuple3<String, String, String>("b", "word2", "a");
-// Tuple3<String, String, String> word3 = new Tuple3<String, String, String>("a", "word3", "a");
-// Tuple2<Integer, Integer> int1 = new Tuple2<Integer, Integer>(2, 1);
-// Tuple2<Integer, Integer> int2 = new Tuple2<Integer, Integer>(1, 2);
-// Tuple2<Integer, Integer> int3 = new Tuple2<Integer, Integer>(0, 3);
-// Tuple2<Integer, Integer> int4 = new Tuple2<Integer, Integer>(2, 4);
-// Tuple2<Integer, Integer> int5 = new Tuple2<Integer, Integer>(1, 5);
-//
-// KeySelector<Tuple3<String, String, String>, ?> keySelector0 = new KeySelector<Tuple3<String, String, String>, String>() {
-//
-// private static final long serialVersionUID = 1L;
-//
-// @Override
-// public String getKey(Tuple3<String, String, String> value) throws Exception {
-// return value.f0;
-// }
-// };
-//
-// KeySelector<Tuple2<Integer, Integer>, ?> keySelector1 = new KeySelector<Tuple2<Integer, Integer>, Integer>() {
-//
-// private static final long serialVersionUID = 1L;
-//
-// @Override
-// public Integer getKey(Tuple2<Integer, Integer> value) throws Exception {
-// return value.f0;
-// }
-// };
-//
-// KeySelector<Tuple3<String, String, String>, ?> keySelector2 = new KeySelector<Tuple3<String, String, String>, String>() {
-//
-// private static final long serialVersionUID = 1L;
-//
-// @Override
-// public String getKey(Tuple3<String, String, String> value) throws Exception {
-// return value.f2;
-// }
-// };
-//
-// CoStreamGroupedReduce<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> invokable = new CoStreamGroupedReduce<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
-// new MyCoReduceFunction(), keySelector0, keySelector1);
-//
-// List<String> expected = Arrays.asList("word1", "1", "word2", "2", "word1word3", "3", "5",
-// "7");
-//
-// List<String> actualList = MockCoContext.createAndExecute(invokable,
-// Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5));
-//
-// assertEquals(expected, actualList);
-//
-// invokable = new CoStreamGroupedReduce<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
-// new MyCoReduceFunction(), keySelector2, keySelector1);
-//
-// expected = Arrays.asList("word1", "1", "word2", "2", "word2word3", "3", "5", "7");
-//
-// actualList = MockCoContext.createAndExecute(invokable, Arrays.asList(word1, word2, word3),
-// Arrays.asList(int1, int2, int3, int4, int5));
-//
-// assertEquals(expected, actualList);
-// }
-//}