You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/09 12:16:40 UTC
[04/10] flink git commit: [FLINK-2780] Remove Old Windowing Logic and
API
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTupleTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTupleTest.java
new file mode 100644
index 0000000..c05f281
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTupleTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple10;
+import org.apache.flink.api.java.tuple.Tuple11;
+import org.apache.flink.api.java.tuple.Tuple12;
+import org.apache.flink.api.java.tuple.Tuple13;
+import org.apache.flink.api.java.tuple.Tuple14;
+import org.apache.flink.api.java.tuple.Tuple15;
+import org.apache.flink.api.java.tuple.Tuple16;
+import org.apache.flink.api.java.tuple.Tuple17;
+import org.apache.flink.api.java.tuple.Tuple18;
+import org.apache.flink.api.java.tuple.Tuple19;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple20;
+import org.apache.flink.api.java.tuple.Tuple21;
+import org.apache.flink.api.java.tuple.Tuple22;
+import org.apache.flink.api.java.tuple.Tuple23;
+import org.apache.flink.api.java.tuple.Tuple24;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.tuple.Tuple9;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class FieldFromTupleTest {
+
+ private String[] testStrings;
+
+ @Before
+ public void init() {
+ testStrings = new String[Tuple.MAX_ARITY];
+ for (int i = 0; i < Tuple.MAX_ARITY; i++) {
+ testStrings[i] = Integer.toString(i);
+ }
+ }
+
+ @Test
+ public void testSingleFieldExtraction() throws InstantiationException, IllegalAccessException {
+ // extract single fields
+ for (int i = 0; i < Tuple.MAX_ARITY; i++) {
+ Tuple current = (Tuple) CLASSES[i].newInstance();
+ for (int j = 0; j < i; j++) {
+ current.setField(testStrings[j], j);
+ }
+ for (int j = 0; j < i; j++) {
+ assertEquals(testStrings[j], new FieldFromTuple<String>(j).extract(current));
+ }
+ }
+ }
+
+ private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class,
+ Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
+ Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
+ Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
+ Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class,
+ Tuple24.class, Tuple25.class };
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArrayTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArrayTest.java
new file mode 100644
index 0000000..7a9a716
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArrayTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.flink.streaming.api.functions.windowing.delta.extractor.FieldsFromArray;
+import org.junit.Test;
+
+public class FieldsFromArrayTest {
+
+ String[] testStringArray = { "0", "1", "2", "3", "4" };
+ Integer[] testIntegerArray = { 10, 11, 12, 13, 14 };
+ int[] testIntArray = { 20, 21, 22, 23, 24 };
+
+ @Test
+ public void testStringArray() {
+ // check single field extraction
+ for (int i = 0; i < testStringArray.length; i++) {
+ String[] tmp = { testStringArray[i] };
+ arrayEqualityCheck(tmp,
+ new FieldsFromArray<String>(String.class, i).extract(testStringArray));
+ }
+
+ // check reverse order
+ String[] reverseOrder = new String[testStringArray.length];
+ for (int i = 0; i < testStringArray.length; i++) {
+ reverseOrder[i] = testStringArray[testStringArray.length - i - 1];
+ }
+ arrayEqualityCheck(reverseOrder,
+ new FieldsFromArray<String>(String.class, 4, 3, 2, 1, 0).extract(testStringArray));
+
+ // check picking fields and reorder
+ String[] crazyOrder = { testStringArray[4], testStringArray[1], testStringArray[2] };
+ arrayEqualityCheck(crazyOrder,
+ new FieldsFromArray<String>(String.class, 4, 1, 2).extract(testStringArray));
+ }
+
+ @Test
+ public void testIntegerArray() {
+ // check single field extraction
+ for (int i = 0; i < testIntegerArray.length; i++) {
+ Integer[] tmp = { testIntegerArray[i] };
+ arrayEqualityCheck(tmp,
+ new FieldsFromArray<Integer>(Integer.class, i).extract(testIntegerArray));
+ }
+
+ // check reverse order
+ Integer[] reverseOrder = new Integer[testIntegerArray.length];
+ for (int i = 0; i < testIntegerArray.length; i++) {
+ reverseOrder[i] = testIntegerArray[testIntegerArray.length - i - 1];
+ }
+ arrayEqualityCheck(reverseOrder,
+ new FieldsFromArray<Integer>(Integer.class, 4, 3, 2, 1, 0)
+ .extract(testIntegerArray));
+
+ // check picking fields and reorder
+ Integer[] crazyOrder = { testIntegerArray[4], testIntegerArray[1], testIntegerArray[2] };
+ arrayEqualityCheck(crazyOrder,
+ new FieldsFromArray<Integer>(Integer.class, 4, 1, 2).extract(testIntegerArray));
+
+ }
+
+ @Test
+ public void testIntArray() {
+ for (int i = 0; i < testIntArray.length; i++) {
+ Integer[] tmp = { testIntArray[i] };
+ arrayEqualityCheck(tmp,
+ new FieldsFromArray<Integer>(Integer.class, i).extract(testIntArray));
+ }
+
+ // check reverse order
+ Integer[] reverseOrder = new Integer[testIntArray.length];
+ for (int i = 0; i < testIntArray.length; i++) {
+ reverseOrder[i] = testIntArray[testIntArray.length - i - 1];
+ }
+ arrayEqualityCheck(reverseOrder,
+ new FieldsFromArray<Integer>(Integer.class, 4, 3, 2, 1, 0).extract(testIntArray));
+
+ // check picking fields and reorder
+ Integer[] crazyOrder = { testIntArray[4], testIntArray[1], testIntArray[2] };
+ arrayEqualityCheck(crazyOrder,
+ new FieldsFromArray<Integer>(Integer.class, 4, 1, 2).extract(testIntArray));
+
+ }
+
+ private void arrayEqualityCheck(Object[] array1, Object[] array2) {
+ assertEquals("The result arrays must have the same length", array1.length, array2.length);
+ for (int i = 0; i < array1.length; i++) {
+ assertEquals("Unequal fields at position " + i, array1[i], array2[i]);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTupleTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTupleTest.java
new file mode 100644
index 0000000..025ed8a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTupleTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple10;
+import org.apache.flink.api.java.tuple.Tuple11;
+import org.apache.flink.api.java.tuple.Tuple12;
+import org.apache.flink.api.java.tuple.Tuple13;
+import org.apache.flink.api.java.tuple.Tuple14;
+import org.apache.flink.api.java.tuple.Tuple15;
+import org.apache.flink.api.java.tuple.Tuple16;
+import org.apache.flink.api.java.tuple.Tuple17;
+import org.apache.flink.api.java.tuple.Tuple18;
+import org.apache.flink.api.java.tuple.Tuple19;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple20;
+import org.apache.flink.api.java.tuple.Tuple21;
+import org.apache.flink.api.java.tuple.Tuple22;
+import org.apache.flink.api.java.tuple.Tuple23;
+import org.apache.flink.api.java.tuple.Tuple24;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.tuple.Tuple9;
+import org.apache.flink.streaming.api.functions.windowing.delta.extractor.FieldsFromTuple;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FieldsFromTupleTest {
+
+ private double[] testDouble;
+
+ @Before
+ public void init() {
+ testDouble = new double[Tuple.MAX_ARITY];
+ for (int i = 0; i < Tuple.MAX_ARITY; i++) {
+ testDouble[i] = i;
+ }
+ }
+
+ @Test
+ public void testUserSpecifiedOrder() throws InstantiationException, IllegalAccessException {
+ Tuple currentTuple = (Tuple) CLASSES[Tuple.MAX_ARITY - 1].newInstance();
+ for (int i = 0; i < Tuple.MAX_ARITY; i++) {
+ currentTuple.setField(testDouble[i], i);
+ }
+
+ double[] expected = { testDouble[5], testDouble[3], testDouble[6], testDouble[7],
+ testDouble[0] };
+ arrayEqualityCheck(expected, new FieldsFromTuple(5, 3, 6, 7, 0).extract(currentTuple));
+
+ double[] expected2 = { testDouble[0], testDouble[Tuple.MAX_ARITY - 1] };
+ arrayEqualityCheck(expected2,
+ new FieldsFromTuple(0, Tuple.MAX_ARITY - 1).extract(currentTuple));
+
+ double[] expected3 = { testDouble[Tuple.MAX_ARITY - 1], testDouble[0] };
+ arrayEqualityCheck(expected3,
+ new FieldsFromTuple(Tuple.MAX_ARITY - 1, 0).extract(currentTuple));
+
+ double[] expected4 = { testDouble[13], testDouble[4], testDouble[5], testDouble[4],
+ testDouble[2], testDouble[8], testDouble[6], testDouble[2], testDouble[8],
+ testDouble[3], testDouble[5], testDouble[2], testDouble[16], testDouble[4],
+ testDouble[3], testDouble[2], testDouble[6], testDouble[4], testDouble[7],
+ testDouble[4], testDouble[2], testDouble[8], testDouble[7], testDouble[2] };
+ arrayEqualityCheck(expected4, new FieldsFromTuple(13, 4, 5, 4, 2, 8, 6, 2, 8, 3, 5, 2, 16,
+ 4, 3, 2, 6, 4, 7, 4, 2, 8, 7, 2).extract(currentTuple));
+ }
+
+ private void arrayEqualityCheck(double[] array1, double[] array2) {
+ assertEquals("The result arrays must have the same length", array1.length, array2.length);
+ for (int i = 0; i < array1.length; i++) {
+ assertEquals("Unequal fields at position " + i, array1[i], array2[i], 0d);
+ }
+ }
+
+ private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class,
+ Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
+ Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
+ Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
+ Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class,
+ Tuple24.class, Tuple25.class };
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
index c116c01..d00dc67 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
@@ -18,9 +18,7 @@
package org.apache.flink.streaming.api.operators.co;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -29,17 +27,12 @@ import java.util.List;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.streaming.util.TestListResultSink;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
-import org.junit.Ignore;
import org.junit.Test;
public class SelfConnectionTest extends StreamingMultipleProgramsTestBase {
@@ -184,15 +177,4 @@ public class SelfConnectionTest extends StreamingMultipleProgramsTestBase {
assertEquals(expected, result);
}
-
- private static class IntegerTimestamp implements Timestamp<Integer> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public long getTimestamp(Integer value) {
- return value;
- }
-
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizerTest.java
deleted file mode 100644
index c464b47..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizerTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.operators.windowing.GroupedStreamDiscretizer;
-import org.apache.flink.streaming.api.operators.windowing.GroupedWindowBuffer;
-import org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.WindowEvent;
-import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class GroupedStreamDiscretizerTest {
-
- KeySelector<Tuple2<Integer, String>, ?> keySelector = new KeySelector<Tuple2<Integer, String>, String>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public String getKey(Tuple2<Integer, String> value) throws Exception {
- return value.f1;
- }
- };
-
- /**
- * Test for not active distributed triggers with single field
- */
- @Test
- public void groupedDiscretizerTest() {
-
- List<Integer> inputs = new ArrayList<Integer>();
- inputs.add(1);
- inputs.add(2);
- inputs.add(2);
- inputs.add(3);
- inputs.add(4);
- inputs.add(5);
- inputs.add(10);
- inputs.add(11);
- inputs.add(11);
-
- Set<StreamWindow<Integer>> expected = new HashSet<StreamWindow<Integer>>();
- expected.add(StreamWindow.fromElements(2, 2));
- expected.add(StreamWindow.fromElements(1, 3));
- expected.add(StreamWindow.fromElements(5, 11));
- expected.add(StreamWindow.fromElements(4, 10));
- expected.add(StreamWindow.fromElements(11));
-
- KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer getKey(Integer value) {
- return value % 2;
- }
- };
-
- CloneableTriggerPolicy<Integer> trigger = new CountTriggerPolicy<Integer>(2);
- CloneableEvictionPolicy<Integer> eviction = new TumblingEvictionPolicy<Integer>();
-
- GroupedStreamDiscretizer<Integer> discretizer = new GroupedStreamDiscretizer<Integer>(
- keySelector, trigger, eviction);
-
- StreamWindowBuffer<Integer> buffer = new GroupedWindowBuffer<Integer>(
- new BasicWindowBuffer<Integer>(), keySelector);
-
- List<WindowEvent<Integer>> bufferEvents = MockContext.createAndExecute(discretizer,
- inputs);
- List<StreamWindow<Integer>> result = MockContext.createAndExecute(buffer, bufferEvents);
-
- assertEquals(expected, new HashSet<StreamWindow<Integer>>(result));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeITCase.java
deleted file mode 100644
index 2c06c00..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeITCase.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Tests that {@link ParallelMerge} does not swallow records of the
- * last window.
- */
-public class ParallelMergeITCase extends StreamingProgramTestBase {
-
- protected String textPath;
- protected String resultPath;
- protected final String input = "To be, or not to be,--that is the question:--" +
- "Whether 'tis nobler in the mind to suffer";
-
- @Override
- protected void preSubmit() throws Exception {
- textPath = createTempFile("text.txt", input);
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- List<String> resultLines = new ArrayList<>();
- readAllResultLines(resultLines, resultPath);
-
- // check that result lines are not swallowed, as every element is expected to be in the
- // last time window we either get the right output or no output at all
- if (resultLines.isEmpty()){
- Assert.fail();
- }
- }
-
- @Override
- protected void testProgram() throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<String> text = env.fromElements(input);
-
- DataStream<Tuple2<String, Integer>> counts =
- text.flatMap(new Tokenizer())
- .window(Time.of(1000, TimeUnit.MILLISECONDS))
- .keyBy(0)
- .sum(1)
- .flatten();
-
- counts.writeAsText(resultPath);
-
- try {
- env.execute();
- } catch (RuntimeException e){
- // might happen at closing the active window
- // do nothing
- }
- }
-
- public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
- throws Exception {
- String[] tokens = value.toLowerCase().split("\\W+");
-
- for (String token : tokens) {
- if (token.length() > 0) {
- out.collect(Tuple2.of(token, 1));
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeTest.java
deleted file mode 100644
index f111890..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
-import org.junit.Test;
-
-public class ParallelMergeTest {
-
- @Test
- public void nonGroupedTest() throws Exception {
-
- ReduceFunction<Integer> reducer = new ReduceFunction<Integer>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer reduce(Integer a, Integer b) throws Exception {
- return a + b;
- }
- };
-
- TestOutput<StreamWindow<Integer>> output = new TestOutput<StreamWindow<Integer>>();
- TimestampedCollector<StreamWindow<Integer>> collector = new TimestampedCollector<StreamWindow<Integer>>(output);
- List<StreamWindow<Integer>> result = output.getCollected();
-
- ParallelMerge<Integer> merger = new ParallelMerge<Integer>(reducer);
- merger.numberOfDiscretizers = 2;
-
- merger.flatMap1(createTestWindow(1), collector);
- merger.flatMap1(createTestWindow(1), collector);
- merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), collector);
- assertTrue(result.isEmpty());
- merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), collector);
- assertEquals(StreamWindow.fromElements(2), result.get(0));
-
- merger.flatMap2(new Tuple2<Integer, Integer>(2, 2), collector);
- merger.flatMap1(createTestWindow(2), collector);
- merger.flatMap1(createTestWindow(2), collector);
- merger.flatMap2(new Tuple2<Integer, Integer>(2, 1), collector);
- assertEquals(1, result.size());
- merger.flatMap1(createTestWindow(2), collector);
- assertEquals(StreamWindow.fromElements(3), result.get(1));
-
- // check error handling
- merger.flatMap1(createTestWindow(3), collector);
- merger.flatMap2(new Tuple2<Integer, Integer>(3, 1), collector);
- merger.flatMap2(new Tuple2<Integer, Integer>(3, 1), collector);
-
- merger.flatMap2(new Tuple2<Integer, Integer>(4, 1), collector);
- merger.flatMap2(new Tuple2<Integer, Integer>(4, 1), collector);
- merger.flatMap1(createTestWindow(4), collector);
- try {
- merger.flatMap1(createTestWindow(4), collector);
- fail();
- } catch (RuntimeException e) {
- // Do nothing
- }
-
- ParallelMerge<Integer> merger2 = new ParallelMerge<Integer>(reducer);
- merger2.numberOfDiscretizers = 2;
- merger2.flatMap1(createTestWindow(0), collector);
- merger2.flatMap1(createTestWindow(1), collector);
- merger2.flatMap1(createTestWindow(1), collector);
- merger2.flatMap2(new Tuple2<Integer, Integer>(1, 1), collector);
- try {
- merger2.flatMap2(new Tuple2<Integer, Integer>(1, 1), collector);
- fail();
- } catch (RuntimeException e) {
- // Do nothing
- }
-
- }
-
- @Test
- public void groupedTest() throws Exception {
-
- TestOutput<StreamWindow<Integer>> output = new TestOutput<StreamWindow<Integer>>();
- TimestampedCollector<StreamWindow<Integer>> collector = new TimestampedCollector<StreamWindow<Integer>>(output);
- List<StreamWindow<Integer>> result = output.getCollected();
-
- ParallelMerge<Integer> merger = new ParallelGroupedMerge<Integer>();
- merger.numberOfDiscretizers = 2;
-
- merger.flatMap1(createTestWindow(1), collector);
- merger.flatMap1(createTestWindow(1), collector);
- merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), collector);
- assertTrue(result.isEmpty());
- merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), collector);
- assertEquals(StreamWindow.fromElements(1, 1), result.get(0));
- }
-
- private StreamWindow<Integer> createTestWindow(Integer id) {
- StreamWindow<Integer> ret = new StreamWindow<Integer>(id);
- ret.add(1);
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizerTest.java
deleted file mode 100644
index 24251f1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizerTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer;
-import org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.WindowEvent;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class StreamDiscretizerTest {
-
-
- @Test
- public void testDiscretizer() {
-
- List<Integer> inputs = new ArrayList<Integer>();
- inputs.add(1);
- inputs.add(2);
- inputs.add(2);
- inputs.add(3);
- inputs.add(4);
- inputs.add(5);
- inputs.add(10);
- inputs.add(11);
- inputs.add(11);
-
- List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
- expected.add(StreamWindow.fromElements(1, 2, 2, 3, 4));
- expected.add(StreamWindow.fromElements(3, 4, 5));
- expected.add(StreamWindow.fromElements(5));
- expected.add(StreamWindow.fromElements(10));
- expected.add(StreamWindow.fromElements(10, 11, 11));
-
- Timestamp<Integer> myTimeStamp = new Timestamp<Integer>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public long getTimestamp(Integer value) {
- return value;
- }
- };
-
- TriggerPolicy<Integer> trigger = new TimeTriggerPolicy<Integer>(2L,
- new TimestampWrapper<Integer>(myTimeStamp, 3));
-
- EvictionPolicy<Integer> eviction = new TimeEvictionPolicy<Integer>(4L,
- new TimestampWrapper<Integer>(myTimeStamp, 1));
-
-
-
- StreamDiscretizer<Integer> discretizer = new StreamDiscretizer<Integer>(trigger, eviction);
- StreamWindowBuffer<Integer> buffer = new StreamWindowBuffer<Integer>(new BasicWindowBuffer<Integer>());
-
- List<WindowEvent<Integer>> bufferEvents = MockContext.createAndExecute(discretizer, inputs);
- List<StreamWindow<Integer>> result = MockContext.createAndExecute(buffer, bufferEvents);
-
- assertEquals(expected, result);
- }
-
- @Test
- public void testDiscretizer2() {
-
- List<Integer> inputs = new ArrayList<Integer>();
- inputs.add(1);
- inputs.add(2);
- inputs.add(2);
- inputs.add(3);
- inputs.add(4);
-
- List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
- expected.add(StreamWindow.fromElements(1, 2));
- expected.add(StreamWindow.fromElements(2, 3));
- expected.add(StreamWindow.fromElements(4));
-
- TriggerPolicy<Integer> trigger = new CountTriggerPolicy<Integer>(2);
-
- EvictionPolicy<Integer> eviction = new TumblingEvictionPolicy<Integer>();
-
- StreamDiscretizer<Integer> discretizer = new StreamDiscretizer<Integer>(trigger, eviction);
- StreamWindowBuffer<Integer> buffer = new StreamWindowBuffer<Integer>(new BasicWindowBuffer<Integer>());
-
- List<WindowEvent<Integer>> bufferEvents = MockContext.createAndExecute(discretizer, inputs);
- List<StreamWindow<Integer>> result = MockContext.createAndExecute(buffer, bufferEvents);
- assertEquals(expected, result);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattenerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattenerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattenerTest.java
deleted file mode 100644
index dc6d0d6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattenerTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class WindowFlattenerTest {
-
- @Test
- public void test() {
- OneInputStreamOperator<StreamWindow<Integer>, Integer> flattener = new WindowFlattener<Integer>();
-
- StreamWindow<Integer> w1 = StreamWindow.fromElements(1, 2, 3);
- StreamWindow<Integer> w2 = new StreamWindow<Integer>();
-
- List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
- input.add(w1);
- input.add(w2);
-
- List<Integer> expected = new ArrayList<Integer>();
- expected.addAll(w1);
- expected.addAll(w2);
-
- List<Integer> output = MockContext.createAndExecute(flattener, input);
-
- assertEquals(expected, output);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowFolderTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowFolderTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowFolderTest.java
deleted file mode 100644
index 3b54069..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowFolderTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class WindowFolderTest {
-
- @Test
- public void test() {
- OneInputStreamOperator<StreamWindow<Integer>, StreamWindow<String>> windowReducer = new WindowFolder<Integer,String>(
- new FoldFunction<Integer, String>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public String fold(String accumulator, Integer value) throws Exception {
- return accumulator + value.toString();
- }
- }, "");
-
- List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
- input.add(StreamWindow.fromElements(1, 2, 3));
- input.add(new StreamWindow<Integer>());
- input.add(StreamWindow.fromElements(-1));
-
- List<StreamWindow<String>> expected = new ArrayList<StreamWindow<String>>();
- expected.add(StreamWindow.fromElements("123"));
- expected.add(new StreamWindow<String>());
- expected.add(StreamWindow.fromElements("-1"));
-
- List<StreamWindow<String>> output = MockContext.createAndExecute(windowReducer, input);
-
- assertEquals(expected, output);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowMapperTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowMapperTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowMapperTest.java
deleted file mode 100644
index 9836a99..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowMapperTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.streaming.api.functions.WindowMapFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.util.MockContext;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class WindowMapperTest {
-
- @Test
- public void test() {
- OneInputStreamOperator<StreamWindow<Integer>, StreamWindow<Integer>> windowMapper = new WindowMapper<Integer, Integer>(
- new WindowMapFunction<Integer, Integer>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void mapWindow(Iterable<Integer> values, Collector<Integer> out)
- throws Exception {
- for (Integer v : values) {
- out.collect(v);
- }
- }
- });
-
- List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
- input.add(StreamWindow.fromElements(1, 2, 3));
- input.add(new StreamWindow<Integer>());
-
- List<StreamWindow<Integer>> output = MockContext.createAndExecute(windowMapper, input);
-
- assertEquals(input, output);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowMergerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowMergerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowMergerTest.java
deleted file mode 100644
index 43e3785..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowMergerTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class WindowMergerTest {
-
- @Test
- public void test() throws Exception {
- OneInputStreamOperator<StreamWindow<Integer>, StreamWindow<Integer>> windowMerger = new WindowMerger<Integer>();
-
- StreamWindow<Integer> w1 = new StreamWindow<Integer>();
- StreamWindow<Integer> w2 = StreamWindow.fromElements(1, 2, 3, 4);
- StreamWindow<Integer> w3 = StreamWindow.fromElements(-1, 2, 3, 4);
- StreamWindow<Integer> w4_1 = new StreamWindow<Integer>(1, 2);
- StreamWindow<Integer> w4_2 = new StreamWindow<Integer>(1, 2);
- w4_1.add(1);
- w4_2.add(2);
-
- List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
- expected.add(w1);
- expected.add(w2);
- expected.add(w3);
- expected.add(StreamWindow.fromElements(1, 2));
-
- List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
- input.add(w1);
- input.add(w4_1);
- input.addAll(StreamWindow.split(w2, 2));
- input.addAll(StreamWindow.partitionBy(w3, new KeySelector<Integer, Integer>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer getKey(Integer value) throws Exception {
- return value % 2;
- }
- }, false));
- input.add(w4_2);
-
- List<StreamWindow<Integer>> output = MockContext.createAndExecute(windowMerger, input);
-
- assertEquals(expected.size(), expected.size());
- for (int i = 0; i < output.size(); i++) {
- assertEquals(new HashSet<Integer>(expected.get(i)), new HashSet<Integer>(output.get(i)));
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitionerTest.java
deleted file mode 100644
index 7521a2b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitionerTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class WindowPartitionerTest {
-
- @Test
- public void test() throws Exception {
- OneInputStreamOperator<StreamWindow<Integer>, StreamWindow<Integer>> splitPartitioner = new WindowPartitioner<Integer>(
- 2);
-
- OneInputStreamOperator<StreamWindow<Integer>, StreamWindow<Integer>> gbPartitioner = new WindowPartitioner<Integer>(
- new MyKey());
-
- StreamWindow<Integer> w1 = new StreamWindow<Integer>();
- StreamWindow<Integer> w2 = StreamWindow.fromElements(1, 2, 3, 4);
-
- List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>();
- expected1.addAll(StreamWindow.split(w1,2));
- expected1.addAll(StreamWindow.split(w2,2));
-
- List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>();
- expected2.addAll(StreamWindow.partitionBy(w1,new MyKey(),false));
- expected2.addAll(StreamWindow.partitionBy(w2,new MyKey(),false));
-
- List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
- input.add(w1);
- input.add(w2);
-
- List<StreamWindow<Integer>> output1 = MockContext.createAndExecute(splitPartitioner, input);
- List<StreamWindow<Integer>> output2 = MockContext.createAndExecute(gbPartitioner, input);
-
- assertEquals(expected1, output1);
- assertEquals(expected2, output2);
-
- }
-
- private static class MyKey implements KeySelector<Integer, Object> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object getKey(Integer value) throws Exception {
- return value / 2;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowReducerTest.java
deleted file mode 100644
index b78a5ba..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowReducerTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class WindowReducerTest {
-
- @Test
- public void test() {
- OneInputStreamOperator<StreamWindow<Integer>, StreamWindow<Integer>> windowReducer = new WindowReducer<Integer>(
- new ReduceFunction<Integer>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer reduce(Integer value1, Integer value2) throws Exception {
- return value1 + value2;
- }
- });
-
- List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
- input.add(StreamWindow.fromElements(1, 2, 3));
- input.add(new StreamWindow<Integer>());
- input.add(StreamWindow.fromElements(-1));
-
- List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
- expected.add(StreamWindow.fromElements(6));
- expected.add(new StreamWindow<Integer>());
- expected.add(StreamWindow.fromElements(-1));
-
- List<StreamWindow<Integer>> output = MockContext.createAndExecute(windowReducer, input);
-
- assertEquals(expected, output);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
deleted file mode 100644
index 4611966..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
+++ /dev/null
@@ -1,529 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.WindowMapFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.helper.Count;
-import org.apache.flink.streaming.api.windowing.helper.FullStream;
-import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class WindowingITCase extends StreamingMultipleProgramsTestBase {
-
- @SuppressWarnings("serial")
- public static class ModKey implements KeySelector<Integer, Integer> {
- private int m;
-
- public ModKey(int m) {
- this.m = m;
- }
-
- @Override
- public Integer getKey(Integer value) throws Exception {
- return value % m;
- }
- }
-
- @SuppressWarnings("serial")
- public static class IdentityWindowMap implements
- WindowMapFunction<Integer, StreamWindow<Integer>> {
-
- @Override
- public void mapWindow(Iterable<Integer> values, Collector<StreamWindow<Integer>> out)
- throws Exception {
-
- StreamWindow<Integer> window = new StreamWindow<Integer>();
-
- for (Integer value : values) {
- window.add(value);
- }
- out.collect(window);
- }
-
- }
-
- @SuppressWarnings("serial")
- @Test
- public void test() throws Exception {
-
- List<Integer> inputs = new ArrayList<Integer>();
- inputs.add(1);
- inputs.add(2);
- inputs.add(2);
- inputs.add(3);
- inputs.add(4);
- inputs.add(5);
- inputs.add(10);
- inputs.add(11);
- inputs.add(11);
-
- KeySelector<Integer, ?> key = new ModKey(2);
-
- Timestamp<Integer> ts = new IntegerTimestamp();
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
- env.disableOperatorChaining();
-
- DataStream<Integer> source = env.fromCollection(inputs);
-
- source.window(Time.of(3, ts, 1)).every(Time.of(2, ts, 1)).sum(0).getDiscretizedStream()
- .addSink(new TestSink1());
-
- source.window(Time.of(4, ts, 1)).keyBy(new ModKey(2))
- .mapWindow(new IdentityWindowMap())
- .flatten()
- .addSink(new TestSink2()).name("TESTSIUNK2");
-
- source.keyBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
- .addSink(new TestSink4());
-
- source.keyBy(new ModKey(3)).window(Count.of(2)).keyBy(new ModKey(2))
- .mapWindow(new IdentityWindowMap())
- .flatten()
- .addSink(new TestSink5());
-
- source.window(Time.of(2, ts)).every(Time.of(3, ts)).min(0).getDiscretizedStream()
- .addSink(new TestSink3());
-
- source.keyBy(key).window(Time.of(4, ts, 1)).max(0).getDiscretizedStream()
- .addSink(new TestSink6());
-
- source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap())
- .flatten()
- .addSink(new TestSink7());
-
- source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).keyBy(new ModKey(2)).sum(0)
- .getDiscretizedStream()
- .addSink(new TestSink8());
-
- try {
- source.window(FullStream.window()).every(Count.of(2)).getDiscretizedStream();
- fail();
- } catch (Exception e) {
- }
- try {
- source.window(FullStream.window()).getDiscretizedStream();
- fail();
- } catch (Exception e) {
- }
- try {
- source.every(Count.of(5)).mapWindow(new IdentityWindowMap()).getDiscretizedStream();
- fail();
- } catch (Exception e) {
- }
-
- source.every(Count.of(4)).sum(0).getDiscretizedStream().addSink(new TestSink11());
-
- source.window(FullStream.window()).every(Count.of(4)).keyBy(key).sum(0)
- .getDiscretizedStream()
- .addSink(new TestSink12());
-
- DataStream<Integer> source2 = env.addSource(new ParallelSourceFunction<Integer>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void run(SourceContext<Integer> ctx) throws Exception {
- for (int i = 1; i <= 10; i++) {
- ctx.collect(i);
- }
- }
-
- @Override
- public void cancel() {
- }
- });
-
- DataStream<Integer> source3 = env.addSource(new RichParallelSourceFunction<Integer>() {
- private static final long serialVersionUID = 1L;
-
- private int i = 1;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- i = 1 + getRuntimeContext().getIndexOfThisSubtask();
- }
-
- @Override
- public void cancel() {
- }
-
- @Override
- public void run(SourceContext<Integer> ctx) throws Exception {
- for (;i < 11; i += 2) {
- ctx.collect(i);
- }
-
- }
- });
-
- source2.window(Time.of(2, ts, 1)).sum(0).getDiscretizedStream().addSink(new TestSink9());
-
- source3.window(Time.of(5, ts, 1)).keyBy(new ModKey(2)).sum(0).getDiscretizedStream()
- .addSink(new TestSink10());
-
- source
- .map(new MapFunction<Integer, Integer>() {
- @Override
- public Integer map(Integer value) throws Exception {
- return value;
- }
- })
- .every(Time.of(5, ts, 1)).sum(0).getDiscretizedStream()
- .addSink(new TestSink13());
-
- env.execute();
-
- // sum ( Time of 3 slide 2 )
- List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>();
- expected1.add(StreamWindow.fromElements(5));
- expected1.add(StreamWindow.fromElements(11));
- expected1.add(StreamWindow.fromElements(9));
- expected1.add(StreamWindow.fromElements(10));
- expected1.add(StreamWindow.fromElements(32));
-
- validateOutput(expected1, TestSink1.windows);
-
- // Tumbling Time of 4 grouped by mod 2
- List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>();
- expected2.add(StreamWindow.fromElements(2, 2, 4));
- expected2.add(StreamWindow.fromElements(1, 3));
- expected2.add(StreamWindow.fromElements(5));
- expected2.add(StreamWindow.fromElements(10));
- expected2.add(StreamWindow.fromElements(11, 11));
-
- validateOutput(expected2, TestSink2.windows);
-
- // groupby mod 2 sum ( Tumbling Time of 4)
- List<StreamWindow<Integer>> expected3 = new ArrayList<StreamWindow<Integer>>();
- expected3.add(StreamWindow.fromElements(4));
- expected3.add(StreamWindow.fromElements(5));
- expected3.add(StreamWindow.fromElements(22));
- expected3.add(StreamWindow.fromElements(8));
- expected3.add(StreamWindow.fromElements(10));
-
- validateOutput(expected3, TestSink4.windows);
-
- // groupby mod3 Tumbling Count of 2 grouped by mod 2
- List<StreamWindow<Integer>> expected4 = new ArrayList<StreamWindow<Integer>>();
- expected4.add(StreamWindow.fromElements(2, 2));
- expected4.add(StreamWindow.fromElements(1));
- expected4.add(StreamWindow.fromElements(4));
- expected4.add(StreamWindow.fromElements(5, 11));
- expected4.add(StreamWindow.fromElements(10));
- expected4.add(StreamWindow.fromElements(11));
- expected4.add(StreamWindow.fromElements(3));
-
- validateOutput(expected4, TestSink5.windows);
-
- // min ( Time of 2 slide 3 )
- List<StreamWindow<Integer>> expected5 = new ArrayList<StreamWindow<Integer>>();
- expected5.add(StreamWindow.fromElements(1));
- expected5.add(StreamWindow.fromElements(4));
- expected5.add(StreamWindow.fromElements(10));
-
- validateOutput(expected5, TestSink3.windows);
-
- // groupby mod 2 max ( Tumbling Time of 4)
- List<StreamWindow<Integer>> expected6 = new ArrayList<StreamWindow<Integer>>();
- expected6.add(StreamWindow.fromElements(3));
- expected6.add(StreamWindow.fromElements(5));
- expected6.add(StreamWindow.fromElements(11));
- expected6.add(StreamWindow.fromElements(4));
- expected6.add(StreamWindow.fromElements(10));
-
- validateOutput(expected6, TestSink6.windows);
-
- List<StreamWindow<Integer>> expected7 = new ArrayList<StreamWindow<Integer>>();
- expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5));
- expected7.add(StreamWindow.fromElements(10));
- expected7.add(StreamWindow.fromElements(10, 11, 11));
-
- validateOutput(expected7, TestSink7.windows);
-
- List<StreamWindow<Integer>> expected8 = new ArrayList<StreamWindow<Integer>>();
- expected8.add(StreamWindow.fromElements(4, 8));
- expected8.add(StreamWindow.fromElements(4, 5));
- expected8.add(StreamWindow.fromElements(10, 22));
-
- for (List<Integer> sw : TestSink8.windows) {
- Collections.sort(sw);
- }
-
- validateOutput(expected8, TestSink8.windows);
-
- List<StreamWindow<Integer>> expected9 = new ArrayList<StreamWindow<Integer>>();
- expected9.add(StreamWindow.fromElements(6));
- expected9.add(StreamWindow.fromElements(14));
- expected9.add(StreamWindow.fromElements(22));
- expected9.add(StreamWindow.fromElements(30));
- expected9.add(StreamWindow.fromElements(38));
-
- validateOutput(expected9, TestSink9.windows);
-
- List<StreamWindow<Integer>> expected10 = new ArrayList<StreamWindow<Integer>>();
- expected10.add(StreamWindow.fromElements(6, 9));
- expected10.add(StreamWindow.fromElements(16, 24));
-
- for (List<Integer> sw : TestSink10.windows) {
- Collections.sort(sw);
- }
-
- validateOutput(expected10, TestSink10.windows);
-
- List<StreamWindow<Integer>> expected11 = new ArrayList<StreamWindow<Integer>>();
- expected11.add(StreamWindow.fromElements(8));
- expected11.add(StreamWindow.fromElements(38));
- expected11.add(StreamWindow.fromElements(49));
-
- for (List<Integer> sw : TestSink11.windows) {
- Collections.sort(sw);
- }
-
- validateOutput(expected11, TestSink11.windows);
-
- List<StreamWindow<Integer>> expected12 = new ArrayList<StreamWindow<Integer>>();
- expected12.add(StreamWindow.fromElements(4, 4));
- expected12.add(StreamWindow.fromElements(18, 20));
- expected12.add(StreamWindow.fromElements(18, 31));
-
- for (List<Integer> sw : TestSink12.windows) {
- Collections.sort(sw);
- }
-
- validateOutput(expected12, TestSink12.windows);
-
- List<StreamWindow<Integer>> expected13 = new ArrayList<StreamWindow<Integer>>();
- expected13.add(StreamWindow.fromElements(17));
- expected13.add(StreamWindow.fromElements(27));
- expected13.add(StreamWindow.fromElements(49));
-
- for (List<Integer> sw : TestSink13.windows) {
- Collections.sort(sw);
- }
-
- validateOutput(expected13, TestSink13.windows);
-
- }
-
- public static <R> void validateOutput(List<R> expected, List<R> actual) {
- assertEquals(new HashSet<R>(expected), new HashSet<R>(actual));
- }
-
- @SuppressWarnings("serial")
- private static class TestSink1 implements SinkFunction<StreamWindow<Integer>> {
-
- public static List<StreamWindow<Integer>> windows = Collections
- .synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
- @Override
- public void invoke(StreamWindow<Integer> value) throws Exception {
- windows.add(value);
- }
-
- }
-
- @SuppressWarnings("serial")
- private static class TestSink2 implements SinkFunction<StreamWindow<Integer>> {
-
- public static List<StreamWindow<Integer>> windows = Collections
- .synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
- @Override
- public void invoke(StreamWindow<Integer> value) throws Exception {
- windows.add(value);
- }
-
- }
-
- @SuppressWarnings("serial")
- private static class TestSink3 implements SinkFunction<StreamWindow<Integer>> {
-
- public static List<StreamWindow<Integer>> windows = Collections
- .synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
- @Override
- public void invoke(StreamWindow<Integer> value) throws Exception {
- windows.add(value);
- }
-
- }
-
- @SuppressWarnings("serial")
- private static class TestSink4 implements SinkFunction<StreamWindow<Integer>> {
-
- public static List<StreamWindow<Integer>> windows = Collections
- .synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
- @Override
- public void invoke(StreamWindow<Integer> value) throws Exception {
- windows.add(value);
- }
-
- }
-
- @SuppressWarnings("serial")
- private static class TestSink5 implements SinkFunction<StreamWindow<Integer>> {
-
- public static List<StreamWindow<Integer>> windows = Collections
- .synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
- @Override
- public void invoke(StreamWindow<Integer> value) throws Exception {
- windows.add(value);
- }
-
- }
-
- @SuppressWarnings("serial")
- private static class TestSink6 implements SinkFunction<StreamWindow<Integer>> {
-
- public static List<StreamWindow<Integer>> windows = Collections
- .synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
- @Override
- public void invoke(StreamWindow<Integer> value) throws Exception {
- windows.add(value);
- }
-
- }
-
- @SuppressWarnings("serial")
- private static class TestSink7 implements SinkFunction<StreamWindow<Integer>> {
-
- public static List<StreamWindow<Integer>> windows = Collections
- .synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
- @Override
- public void invoke(StreamWindow<Integer> value) throws Exception {
- windows.add(value);
- }
-
- }
-
- @SuppressWarnings("serial")
- private static class TestSink8 implements SinkFunction<StreamWindow<Integer>> {
-
- public static List<StreamWindow<Integer>> windows = Collections
- .synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
- @Override
- public void invoke(StreamWindow<Integer> value) throws Exception {
- windows.add(value);
- }
-
- }
-
- @SuppressWarnings("serial")
- private static class TestSink9 implements SinkFunction<StreamWindow<Integer>> {
-
- public static List<StreamWindow<Integer>> windows = Collections
- .synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
- @Override
- public void invoke(StreamWindow<Integer> value) throws Exception {
- windows.add(value);
- }
-
- }
-
- @SuppressWarnings("serial")
- private static class TestSink10 implements SinkFunction<StreamWindow<Integer>> {
-
- public static List<StreamWindow<Integer>> windows = Collections
- .synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
- @Override
- public void invoke(StreamWindow<Integer> value) throws Exception {
- windows.add(value);
- }
-
- }
-
- @SuppressWarnings("serial")
- private static class TestSink11 implements SinkFunction<StreamWindow<Integer>> {
-
- public static List<StreamWindow<Integer>> windows = Collections
- .synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
- @Override
- public void invoke(StreamWindow<Integer> value) throws Exception {
- windows.add(value);
- }
-
- }
-
- @SuppressWarnings("serial")
- private static class TestSink12 implements SinkFunction<StreamWindow<Integer>> {
-
- public static List<StreamWindow<Integer>> windows = Collections
- .synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
- @Override
- public void invoke(StreamWindow<Integer> value) throws Exception {
- windows.add(value);
- }
-
- }
-
- @SuppressWarnings("serial")
- private static class TestSink13 implements SinkFunction<StreamWindow<Integer>> {
-
- public static List<StreamWindow<Integer>> windows = Collections
- .synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
- @Override
- public void invoke(StreamWindow<Integer> value) throws Exception {
- windows.add(value);
- }
-
- }
-
- private static class IntegerTimestamp implements Timestamp<Integer> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public long getTimestamp(Integer value) {
- return value;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTest.java
deleted file mode 100644
index c3efc7b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTest.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.StreamWindowSerializer;
-import org.junit.Test;
-
-public class StreamWindowTest {
-
- @Test
- public void creationTest() {
-
- StreamWindow<Integer> window1 = new StreamWindow<Integer>();
- assertTrue(window1.isEmpty());
- assertTrue(window1.windowID != 0);
-
- window1.add(10);
- assertEquals(1, window1.size());
-
- StreamWindow<Integer> window2 = new StreamWindow<Integer>(window1);
-
- assertTrue(window1.windowID == window2.windowID);
- assertEquals(1, window2.size());
-
- StreamWindow<Integer> window3 = new StreamWindow<Integer>(100);
- assertEquals(100, window3.windowID);
-
- StreamWindow<Integer> window4 = new StreamWindow<Integer>();
- assertFalse(window4.windowID == window1.windowID);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void mergeTest() throws IOException {
- StreamWindow<Integer> window1 = new StreamWindow<Integer>().setNumberOfParts(3);
- StreamWindow<Integer> window2 = new StreamWindow<Integer>(window1.windowID, 3);
- StreamWindow<Integer> window3 = new StreamWindow<Integer>(window1.windowID, 3);
-
- window1.add(1);
- window2.add(2);
- window3.add(3);
-
- Set<Integer> values = new HashSet<Integer>();
- values.add(1);
- values.add(2);
- values.add(3);
-
- StreamWindow<Integer> merged = StreamWindow.merge(window1, window2, window3);
-
- assertEquals(3, merged.size());
- assertEquals(window1.windowID, merged.windowID);
- assertEquals(values, new HashSet<Integer>(merged));
-
- try {
- StreamWindow.merge(window1, new StreamWindow<Integer>());
- fail();
- } catch (RuntimeException e) {
- // good
- }
-
- List<StreamWindow<Integer>> wList = StreamWindow.split(merged,3);
-
- StreamWindow<Integer> merged2 = StreamWindow.merge(wList);
-
- assertEquals(3, merged2.size());
- assertEquals(window1.windowID, merged2.windowID);
- assertEquals(values, new HashSet<Integer>(merged2));
-
- }
-
- @Test
- public void serializerTest() throws IOException {
-
- StreamWindow<Integer> streamWindow = new StreamWindow<Integer>();
- streamWindow.add(1);
- streamWindow.add(2);
- streamWindow.add(3);
-
- TypeSerializer<StreamWindow<Integer>> ts = new StreamWindowSerializer<Integer>(
- BasicTypeInfo.INT_TYPE_INFO, null);
-
- TestOutputView ow = new TestOutputView();
-
- ts.serialize(streamWindow, ow);
-
- TestInputView iw = ow.getInputView();
-
- assertEquals(streamWindow, ts.deserialize(iw));
-
- }
-
- @Test
- public void partitionTest() {
- StreamWindow<Integer> streamWindow = new StreamWindow<Integer>();
- streamWindow.add(1);
- streamWindow.add(2);
- streamWindow.add(3);
- streamWindow.add(4);
- streamWindow.add(5);
- streamWindow.add(6);
-
- List<StreamWindow<Integer>> split = StreamWindow.split(streamWindow,2);
- assertEquals(2, split.size());
- assertEquals(StreamWindow.fromElements(1, 2, 3), split.get(0));
- assertEquals(StreamWindow.fromElements(4, 5, 6), split.get(1));
-
- List<StreamWindow<Integer>> split2 = StreamWindow.split(streamWindow,6);
- assertEquals(6, split2.size());
- assertEquals(StreamWindow.fromElements(1), split2.get(0));
- assertEquals(StreamWindow.fromElements(2), split2.get(1));
- assertEquals(StreamWindow.fromElements(3), split2.get(2));
- assertEquals(StreamWindow.fromElements(4), split2.get(3));
- assertEquals(StreamWindow.fromElements(5), split2.get(4));
- assertEquals(StreamWindow.fromElements(6), split2.get(5));
-
- List<StreamWindow<Integer>> split3 = StreamWindow.split(streamWindow,10);
- assertEquals(6, split3.size());
- assertEquals(StreamWindow.fromElements(1), split3.get(0));
- assertEquals(StreamWindow.fromElements(2), split3.get(1));
- assertEquals(StreamWindow.fromElements(3), split3.get(2));
- assertEquals(StreamWindow.fromElements(4), split3.get(3));
- assertEquals(StreamWindow.fromElements(5), split3.get(4));
- assertEquals(StreamWindow.fromElements(6), split3.get(5));
-
- }
-
- private class TestOutputView extends DataOutputStream implements DataOutputView {
-
- public TestOutputView() {
- super(new ByteArrayOutputStream(4096));
- }
-
- public TestInputView getInputView() {
- ByteArrayOutputStream baos = (ByteArrayOutputStream) out;
- return new TestInputView(baos.toByteArray());
- }
-
- @Override
- public void skipBytesToWrite(int numBytes) throws IOException {
- for (int i = 0; i < numBytes; i++) {
- write(0);
- }
- }
-
- @Override
- public void write(DataInputView source, int numBytes) throws IOException {
- byte[] buffer = new byte[numBytes];
- source.readFully(buffer);
- write(buffer);
- }
- }
-
- private class TestInputView extends DataInputStream implements DataInputView {
-
- public TestInputView(byte[] data) {
- super(new ByteArrayInputStream(data));
- }
-
- @Override
- public void skipBytesToRead(int numBytes) throws IOException {
- while (numBytes > 0) {
- int skipped = skipBytes(numBytes);
- numBytes -= skipped;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfoTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfoTest.java
deleted file mode 100644
index 2e98a8f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfoTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing;
-
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.PojoField;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-import java.util.ArrayList;
-
-import static org.junit.Assert.*;
-
-public class StreamWindowTypeInfoTest extends TestLogger {
-
- public static class TestClass{}
-
- @Test
- public void testStreamWindowTypeInfoEquality() {
- StreamWindowTypeInfo<TestClass> tpeInfo1 = new StreamWindowTypeInfo<>(new GenericTypeInfo<>(TestClass.class));
- StreamWindowTypeInfo<TestClass> tpeInfo2 = new StreamWindowTypeInfo<>(new GenericTypeInfo<>(TestClass.class));
-
- assertEquals(tpeInfo1, tpeInfo2);
- assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
- }
-
- @Test
- public void testStreamWindowTypeInfoInequality() {
- StreamWindowTypeInfo<TestClass> tpeInfo1 = new StreamWindowTypeInfo<>(new GenericTypeInfo<>(TestClass.class));
- StreamWindowTypeInfo<TestClass> tpeInfo2 = new StreamWindowTypeInfo<>(new PojoTypeInfo<>(TestClass.class, new ArrayList<PojoField>()));
-
- assertNotEquals(tpeInfo1, tpeInfo2);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTupleTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTupleTest.java
deleted file mode 100644
index 17d3974..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTupleTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple10;
-import org.apache.flink.api.java.tuple.Tuple11;
-import org.apache.flink.api.java.tuple.Tuple12;
-import org.apache.flink.api.java.tuple.Tuple13;
-import org.apache.flink.api.java.tuple.Tuple14;
-import org.apache.flink.api.java.tuple.Tuple15;
-import org.apache.flink.api.java.tuple.Tuple16;
-import org.apache.flink.api.java.tuple.Tuple17;
-import org.apache.flink.api.java.tuple.Tuple18;
-import org.apache.flink.api.java.tuple.Tuple19;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple20;
-import org.apache.flink.api.java.tuple.Tuple21;
-import org.apache.flink.api.java.tuple.Tuple22;
-import org.apache.flink.api.java.tuple.Tuple23;
-import org.apache.flink.api.java.tuple.Tuple24;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple6;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.java.tuple.Tuple9;
-import org.apache.flink.streaming.api.windowing.extractor.ArrayFromTuple;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ArrayFromTupleTest {
-
- private String[] testStrings;
-
- @Before
- public void init() {
- testStrings = new String[Tuple.MAX_ARITY];
- for (int i = 0; i < Tuple.MAX_ARITY; i++) {
- testStrings[i] = Integer.toString(i);
- }
- }
-
- @Test
- public void testConvertFromTupleToArray() throws InstantiationException, IllegalAccessException {
- for (int i = 0; i < Tuple.MAX_ARITY; i++) {
- Tuple currentTuple = (Tuple) CLASSES[i].newInstance();
- String[] currentArray = new String[i + 1];
- for (int j = 0; j <= i; j++) {
- currentTuple.setField(testStrings[j], j);
- currentArray[j] = testStrings[j];
- }
- arrayEqualityCheck(currentArray, new ArrayFromTuple().extract(currentTuple));
- }
- }
-
- @Test
- public void testUserSpecifiedOrder() throws InstantiationException, IllegalAccessException {
- Tuple currentTuple = (Tuple) CLASSES[Tuple.MAX_ARITY - 1].newInstance();
- for (int i = 0; i < Tuple.MAX_ARITY; i++) {
- currentTuple.setField(testStrings[i], i);
- }
-
- String[] expected = { testStrings[5], testStrings[3], testStrings[6], testStrings[7],
- testStrings[0] };
- arrayEqualityCheck(expected, new ArrayFromTuple(5, 3, 6, 7, 0).extract(currentTuple));
-
- String[] expected2 = { testStrings[0], testStrings[Tuple.MAX_ARITY - 1] };
- arrayEqualityCheck(expected2,
- new ArrayFromTuple(0, Tuple.MAX_ARITY - 1).extract(currentTuple));
-
- String[] expected3 = { testStrings[Tuple.MAX_ARITY - 1], testStrings[0] };
- arrayEqualityCheck(expected3,
- new ArrayFromTuple(Tuple.MAX_ARITY - 1, 0).extract(currentTuple));
-
- String[] expected4 = { testStrings[13], testStrings[4], testStrings[5], testStrings[4],
- testStrings[2], testStrings[8], testStrings[6], testStrings[2], testStrings[8],
- testStrings[3], testStrings[5], testStrings[2], testStrings[16], testStrings[4],
- testStrings[3], testStrings[2], testStrings[6], testStrings[4], testStrings[7],
- testStrings[4], testStrings[2], testStrings[8], testStrings[7], testStrings[2] };
- arrayEqualityCheck(expected4, new ArrayFromTuple(13, 4, 5, 4, 2, 8, 6, 2, 8, 3, 5, 2, 16,
- 4, 3, 2, 6, 4, 7, 4, 2, 8, 7, 2).extract(currentTuple));
- }
-
- private void arrayEqualityCheck(Object[] array1, Object[] array2) {
- assertEquals("The result arrays must have the same length", array1.length, array2.length);
- for (int i = 0; i < array1.length; i++) {
- assertEquals("Unequal fields at position " + i, array1[i], array2[i]);
- }
- }
-
- private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class,
- Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
- Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
- Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
- Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class,
- Tuple24.class, Tuple25.class };
-}