You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/09 12:16:39 UTC
[03/10] flink git commit: [FLINK-2780] Remove Old Windowing Logic and
API
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ConcatenatedExtractTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ConcatenatedExtractTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ConcatenatedExtractTest.java
deleted file mode 100644
index e99de38..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ConcatenatedExtractTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ConcatenatedExtractTest {
-
- private String[] testStringArray1 = { "1", "2", "3" };
- private int[] testIntArray1 = { 1, 2, 3 };
- private String[] testStringArray2 = { "4", "5", "6" };
- private int[] testIntArray2 = { 4, 5, 6 };
- private String[] testStringArray3 = { "7", "8", "9" };
- private int[] testIntArray3 = { 7, 8, 9 };
- private Tuple2<String[], int[]>[] testTuple2Array;
- private Tuple2<String[], int[]> testTuple2;
- private Tuple2<Tuple2<String[], int[]>, Tuple2<String[], int[]>[]> testData;
-
- @SuppressWarnings("unchecked")
- @Before
- public void setupData() {
- testTuple2Array = new Tuple2[2];
- testTuple2Array[0] = new Tuple2<String[], int[]>(testStringArray1, testIntArray2);
- testTuple2Array[1] = new Tuple2<String[], int[]>(testStringArray2, testIntArray1);
-
- testTuple2 = new Tuple2<String[], int[]>(testStringArray3, testIntArray3);
-
- testData = new Tuple2<Tuple2<String[], int[]>, Tuple2<String[], int[]>[]>(testTuple2,
- testTuple2Array);
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void test1() {
- Extractor ext = new ConcatenatedExtract(new FieldFromTuple(0), new FieldFromTuple(1))
- .add(new FieldsFromArray(Integer.class, 2, 1, 0));
- int[] expected = { testIntArray3[2], testIntArray3[1], testIntArray3[0] };
- assertEquals(new Integer(expected[0]), ((Integer[]) ext.extract(testData))[0]);
- assertEquals(new Integer(expected[1]), ((Integer[]) ext.extract(testData))[1]);
- assertEquals(new Integer(expected[2]), ((Integer[]) ext.extract(testData))[2]);
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- @Test
- public void test2() {
- Extractor ext = new ConcatenatedExtract(new FieldFromTuple(1), // Tuple2<String[],int[]>[]
- new FieldsFromArray(Tuple2.class, 1)) // Tuple2<String[],int[]>[]
- .add(new FieldFromArray(0)) // Tuple2<String[],int[]>
- .add(new ArrayFromTuple(0)) // Object[] (Containing String[])
- .add(new FieldFromArray(0)) // String[]
- .add(new FieldFromArray(1)); // String
-
- String expected2 = testStringArray2[1];
- assertEquals(expected2, ext.extract(testData));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArrayTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArrayTest.java
deleted file mode 100644
index 2d4dbcf..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArrayTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.streaming.api.windowing.extractor.FieldFromArray;
-import org.junit.Test;
-
-public class FieldFromArrayTest {
-
- String[] testStringArray = { "0", "1", "2", "3", "4" };
- Integer[] testIntegerArray = { 10, 11, 12, 13, 14 };
- int[] testIntArray = { 20, 21, 22, 23, 24 };
-
- @Test
- public void testStringArray() {
- for (int i = 0; i < this.testStringArray.length; i++) {
- assertEquals(this.testStringArray[i],
- new FieldFromArray<String>(i).extract(testStringArray));
- }
- }
-
- @Test
- public void testIntegerArray() {
- for (int i = 0; i < this.testIntegerArray.length; i++) {
- assertEquals(this.testIntegerArray[i],
- new FieldFromArray<String>(i).extract(testIntegerArray));
- }
- }
-
- @Test
- public void testIntArray() {
- for (int i = 0; i < this.testIntArray.length; i++) {
- assertEquals(new Integer(this.testIntArray[i]),
- new FieldFromArray<Integer>(i).extract(testIntArray));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTupleTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTupleTest.java
deleted file mode 100644
index 528611a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTupleTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple10;
-import org.apache.flink.api.java.tuple.Tuple11;
-import org.apache.flink.api.java.tuple.Tuple12;
-import org.apache.flink.api.java.tuple.Tuple13;
-import org.apache.flink.api.java.tuple.Tuple14;
-import org.apache.flink.api.java.tuple.Tuple15;
-import org.apache.flink.api.java.tuple.Tuple16;
-import org.apache.flink.api.java.tuple.Tuple17;
-import org.apache.flink.api.java.tuple.Tuple18;
-import org.apache.flink.api.java.tuple.Tuple19;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple20;
-import org.apache.flink.api.java.tuple.Tuple21;
-import org.apache.flink.api.java.tuple.Tuple22;
-import org.apache.flink.api.java.tuple.Tuple23;
-import org.apache.flink.api.java.tuple.Tuple24;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple6;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.java.tuple.Tuple9;
-import org.apache.flink.streaming.api.windowing.extractor.FieldFromTuple;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FieldFromTupleTest {
-
- private String[] testStrings;
-
- @Before
- public void init() {
- testStrings = new String[Tuple.MAX_ARITY];
- for (int i = 0; i < Tuple.MAX_ARITY; i++) {
- testStrings[i] = Integer.toString(i);
- }
- }
-
- @Test
- public void testSingleFieldExtraction() throws InstantiationException, IllegalAccessException {
- // extract single fields
- for (int i = 0; i < Tuple.MAX_ARITY; i++) {
- Tuple current = (Tuple) CLASSES[i].newInstance();
- for (int j = 0; j < i; j++) {
- current.setField(testStrings[j], j);
- }
- for (int j = 0; j < i; j++) {
- assertEquals(testStrings[j], new FieldFromTuple<String>(j).extract(current));
- }
- }
- }
-
- private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class,
- Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
- Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
- Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
- Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class,
- Tuple24.class, Tuple25.class };
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArrayTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArrayTest.java
deleted file mode 100644
index 3139aa5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArrayTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.flink.streaming.api.windowing.extractor.FieldsFromArray;
-import org.junit.Test;
-
-public class FieldsFromArrayTest {
-
- String[] testStringArray = { "0", "1", "2", "3", "4" };
- Integer[] testIntegerArray = { 10, 11, 12, 13, 14 };
- int[] testIntArray = { 20, 21, 22, 23, 24 };
-
- @Test
- public void testStringArray() {
- // check single field extraction
- for (int i = 0; i < testStringArray.length; i++) {
- String[] tmp = { testStringArray[i] };
- arrayEqualityCheck(tmp,
- new FieldsFromArray<String>(String.class, i).extract(testStringArray));
- }
-
- // check reverse order
- String[] reverseOrder = new String[testStringArray.length];
- for (int i = 0; i < testStringArray.length; i++) {
- reverseOrder[i] = testStringArray[testStringArray.length - i - 1];
- }
- arrayEqualityCheck(reverseOrder,
- new FieldsFromArray<String>(String.class, 4, 3, 2, 1, 0).extract(testStringArray));
-
- // check picking fields and reorder
- String[] crazyOrder = { testStringArray[4], testStringArray[1], testStringArray[2] };
- arrayEqualityCheck(crazyOrder,
- new FieldsFromArray<String>(String.class, 4, 1, 2).extract(testStringArray));
- }
-
- @Test
- public void testIntegerArray() {
- // check single field extraction
- for (int i = 0; i < testIntegerArray.length; i++) {
- Integer[] tmp = { testIntegerArray[i] };
- arrayEqualityCheck(tmp,
- new FieldsFromArray<Integer>(Integer.class, i).extract(testIntegerArray));
- }
-
- // check reverse order
- Integer[] reverseOrder = new Integer[testIntegerArray.length];
- for (int i = 0; i < testIntegerArray.length; i++) {
- reverseOrder[i] = testIntegerArray[testIntegerArray.length - i - 1];
- }
- arrayEqualityCheck(reverseOrder,
- new FieldsFromArray<Integer>(Integer.class, 4, 3, 2, 1, 0)
- .extract(testIntegerArray));
-
- // check picking fields and reorder
- Integer[] crazyOrder = { testIntegerArray[4], testIntegerArray[1], testIntegerArray[2] };
- arrayEqualityCheck(crazyOrder,
- new FieldsFromArray<Integer>(Integer.class, 4, 1, 2).extract(testIntegerArray));
-
- }
-
- @Test
- public void testIntArray() {
- for (int i = 0; i < testIntArray.length; i++) {
- Integer[] tmp = { testIntArray[i] };
- arrayEqualityCheck(tmp,
- new FieldsFromArray<Integer>(Integer.class, i).extract(testIntArray));
- }
-
- // check reverse order
- Integer[] reverseOrder = new Integer[testIntArray.length];
- for (int i = 0; i < testIntArray.length; i++) {
- reverseOrder[i] = testIntArray[testIntArray.length - i - 1];
- }
- arrayEqualityCheck(reverseOrder,
- new FieldsFromArray<Integer>(Integer.class, 4, 3, 2, 1, 0).extract(testIntArray));
-
- // check picking fields and reorder
- Integer[] crazyOrder = { testIntArray[4], testIntArray[1], testIntArray[2] };
- arrayEqualityCheck(crazyOrder,
- new FieldsFromArray<Integer>(Integer.class, 4, 1, 2).extract(testIntArray));
-
- }
-
- private void arrayEqualityCheck(Object[] array1, Object[] array2) {
- assertEquals("The result arrays must have the same length", array1.length, array2.length);
- for (int i = 0; i < array1.length; i++) {
- assertEquals("Unequal fields at position " + i, array1[i], array2[i]);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTupleTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTupleTest.java
deleted file mode 100644
index 0379fe0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTupleTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple10;
-import org.apache.flink.api.java.tuple.Tuple11;
-import org.apache.flink.api.java.tuple.Tuple12;
-import org.apache.flink.api.java.tuple.Tuple13;
-import org.apache.flink.api.java.tuple.Tuple14;
-import org.apache.flink.api.java.tuple.Tuple15;
-import org.apache.flink.api.java.tuple.Tuple16;
-import org.apache.flink.api.java.tuple.Tuple17;
-import org.apache.flink.api.java.tuple.Tuple18;
-import org.apache.flink.api.java.tuple.Tuple19;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple20;
-import org.apache.flink.api.java.tuple.Tuple21;
-import org.apache.flink.api.java.tuple.Tuple22;
-import org.apache.flink.api.java.tuple.Tuple23;
-import org.apache.flink.api.java.tuple.Tuple24;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple6;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.java.tuple.Tuple9;
-import org.apache.flink.streaming.api.windowing.extractor.FieldsFromTuple;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FieldsFromTupleTest {
-
- private double[] testDouble;
-
- @Before
- public void init() {
- testDouble = new double[Tuple.MAX_ARITY];
- for (int i = 0; i < Tuple.MAX_ARITY; i++) {
- testDouble[i] = i;
- }
- }
-
- @Test
- public void testUserSpecifiedOrder() throws InstantiationException, IllegalAccessException {
- Tuple currentTuple = (Tuple) CLASSES[Tuple.MAX_ARITY - 1].newInstance();
- for (int i = 0; i < Tuple.MAX_ARITY; i++) {
- currentTuple.setField(testDouble[i], i);
- }
-
- double[] expected = { testDouble[5], testDouble[3], testDouble[6], testDouble[7],
- testDouble[0] };
- arrayEqualityCheck(expected, new FieldsFromTuple(5, 3, 6, 7, 0).extract(currentTuple));
-
- double[] expected2 = { testDouble[0], testDouble[Tuple.MAX_ARITY - 1] };
- arrayEqualityCheck(expected2,
- new FieldsFromTuple(0, Tuple.MAX_ARITY - 1).extract(currentTuple));
-
- double[] expected3 = { testDouble[Tuple.MAX_ARITY - 1], testDouble[0] };
- arrayEqualityCheck(expected3,
- new FieldsFromTuple(Tuple.MAX_ARITY - 1, 0).extract(currentTuple));
-
- double[] expected4 = { testDouble[13], testDouble[4], testDouble[5], testDouble[4],
- testDouble[2], testDouble[8], testDouble[6], testDouble[2], testDouble[8],
- testDouble[3], testDouble[5], testDouble[2], testDouble[16], testDouble[4],
- testDouble[3], testDouble[2], testDouble[6], testDouble[4], testDouble[7],
- testDouble[4], testDouble[2], testDouble[8], testDouble[7], testDouble[2] };
- arrayEqualityCheck(expected4, new FieldsFromTuple(13, 4, 5, 4, 2, 8, 6, 2, 8, 3, 5, 2, 16,
- 4, 3, 2, 6, 4, 7, 4, 2, 8, 7, 2).extract(currentTuple));
- }
-
- private void arrayEqualityCheck(double[] array1, double[] array2) {
- assertEquals("The result arrays must have the same length", array1.length, array2.length);
- for (int i = 0; i < array1.length; i++) {
- assertEquals("Unequal fields at position " + i, array1[i], array2[i], 0d);
- }
- }
-
- private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class,
- Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
- Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
- Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
- Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class,
- Tuple24.class, Tuple25.class };
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicyTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicyTest.java
deleted file mode 100644
index 8a7a011..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicyTest.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class CountEvictionPolicyTest {
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- @Test
- public void testCountEvictionPolicy() {
- List<Integer> tuples = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
- int counter;
-
- // The count policy should not care about the triggered parameter
- // Therefore its value switches after each use in this test.
- boolean triggered = false;
- // the size of the buffer should not matter as well!
-
- // Test count of different sizes (0..9)
- for (int i = 0; i < 10; i++) {
- EvictionPolicy evictionPolicy = new CountEvictionPolicy(i, i);
- counter = 0;
-
- // Test first i steps (should not evict)
- for (int j = 0; j < i; j++) {
- counter++;
- assertEquals("Evictionpolicy with count of " + i + " evicted tuples at add nr. "
- + counter + ". It should not evict for the first " + i + " adds.", 0,
- evictionPolicy.notifyEviction(tuples.get(j), (triggered = !triggered),
- tuples.get(Math.abs((i - j)) % 10)));
- }
-
- // Test the next three evictions
- for (int j = 0; j < 3; j++) {
- // The first add should evict now
- counter++;
- assertEquals("Evictionpolicy with count of " + i
- + " did not evict correct number of tuples at the expected pos " + counter
- + ".", i, evictionPolicy.notifyEviction(tuples.get(j),
- (triggered = !triggered), tuples.get(Math.abs((i - j)) % 10)));
-
- // the next i-1 adds should not evict
- for (int k = 0; k < i - 1; k++) {
- counter++;
- assertEquals("Evictionpolicy with count of " + i
- + " evicted tuples at add nr. " + counter, 0,
- evictionPolicy.notifyEviction(tuples.get(j), (triggered = !triggered),
- tuples.get(Math.abs((i - j)) % 10)));
- }
- }
- }
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testCountEvictionPolicyStartValuesAndEvictionAmount() {
-
- // The count policy should not care about the triggered parameter
- // Therefore its value switches after each use in this test.
- boolean triggered = false;
- // the size of the buffer should not matter as well!
-
- List<Integer> tuples = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
-
- // Text different eviction amounts (0..3)
- for (int x = 0; x < 4; x++) {
-
- // Test count of different sizes (0..9)
- for (int i = 0; i < 10; i++) {
-
- int counter = 0;
-
- // Test different start values (-5..5)
- for (int j = -5; i < 6; i++) {
- EvictionPolicy evictionPolicy = new CountEvictionPolicy(i, x, j);
- // Add tuples without eviction
- for (int k = 0; k < ((i - j > 0) ? i - j : 0); k++) {
- counter++;
- assertEquals("Evictionpolicy with count of " + i
- + " did not evict correct number of tuples at the expected pos "
- + counter + ".", 0, evictionPolicy.notifyEviction(
- tuples.get(Math.abs(j)), (triggered = !triggered),
- tuples.get(Math.abs((i - j)) % 10)));
- }
- // Expect eviction
- counter++;
- assertEquals("Evictionpolicy with count of " + i
- + " did not evict correct number of tuples at the expected pos "
- + counter + ".", x, evictionPolicy.notifyEviction(
- tuples.get(Math.abs(j)), (triggered = !triggered),
- tuples.get(Math.abs((i - j)) % 10)));
- }
- }
- }
- }
-
- @Test
- public void equalityTest() {
- assertEquals(new CountEvictionPolicy<Integer>(5, 5, 5), new CountEvictionPolicy<Integer>(5,
- 5, 5));
-
- assertEquals(new CountEvictionPolicy<Integer>(5, 5), new CountEvictionPolicy<Integer>(5, 5));
- assertEquals(new CountEvictionPolicy<Integer>(5), new CountEvictionPolicy<Integer>(5));
-
- assertNotEquals(new CountEvictionPolicy<Integer>(4, 5, 5),
- new CountEvictionPolicy<Integer>(5, 5, 5));
- assertNotEquals(new CountEvictionPolicy<Integer>(5, 5, 5),
- new CountEvictionPolicy<Integer>(5, 4, 5));
-
- assertNotEquals(new CountEvictionPolicy<Integer>(5, 5, 5),
- new CountEvictionPolicy<Integer>(5, 5, 4));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicyTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicyTest.java
deleted file mode 100644
index ce5ae3b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicyTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.api.windowing.helper.Count;
-import org.junit.Test;
-
-public class CountTriggerPolicyTest {
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testCountTriggerPolicy() {
-
- List tuples = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
- int counter;
-
- // Test count of different sizes (0..9)
- for (int i = 0; i < 10; i++) {
- TriggerPolicy triggerPolicy = Count.of(i).toTrigger();
- counter = 0;
-
- // Test first i steps (should not trigger)
- for (int j = 0; j < i; j++) {
- counter++;
- assertFalse("Triggerpolicy with count of " + i + " triggered at add nr. " + counter
- + ". It should not trigger for the first " + i + " adds.",
- triggerPolicy.notifyTrigger(tuples.get(j)));
- }
-
- // Test the next three triggers
- for (int j = 0; j < 3; j++) {
- // The first add should trigger now
- counter++;
- assertTrue("Triggerpolicy with count of " + i
- + " did not trigger at the expected pos " + counter + ".",
- triggerPolicy.notifyTrigger(tuples.get(j)));
-
- // the next i-1 adds should not trigger
- for (int k = 0; k < i - 1; k++) {
- counter++;
- assertFalse("Triggerpolicy with count of " + i + " triggered at add nr. "
- + counter, triggerPolicy.notifyTrigger(tuples.get(k)));
- }
- }
- }
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testCountTriggerPolicyStartValues() {
-
- List tuples = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
-
- // Test count of different sizes (0..9)
- for (int i = 0; i < 10; i++) {
-
- // Test different start values (-5..5)
- for (int j = -5; i < 6; i++) {
- TriggerPolicy triggerPolicy = new CountTriggerPolicy(i, j);
- // Add tuples without trigger
- for (int k = 0; k < ((i - j > 0) ? i - j : 0); k++) {
- assertFalse("Triggerpolicy with count of " + i + " and start value of " + j
- + " triggered at add nr. " + (k + 1),
- triggerPolicy.notifyTrigger(tuples.get(k % 10)));
- }
- // Expect trigger
- assertTrue("Triggerpolicy with count of " + i + "and start value of " + j
- + " did not trigger at the expected position.",
- triggerPolicy.notifyTrigger(tuples.get(0)));
- }
- }
- }
-
- @Test
- public void equalityTest() {
- assertEquals(new CountTriggerPolicy<Integer>(5, 5), new CountTriggerPolicy<Integer>(5, 5));
-
- assertEquals(new CountTriggerPolicy<Integer>(5, 5), new CountTriggerPolicy<Integer>(5, 5));
- assertEquals(new CountTriggerPolicy<Integer>(5), new CountTriggerPolicy<Integer>(5));
-
- assertNotEquals(new CountTriggerPolicy<Integer>(4, 5),
- new CountTriggerPolicy<Integer>(5, 5));
- assertNotEquals(new CountTriggerPolicy<Integer>(5, 5),
- new CountTriggerPolicy<Integer>(5, 4));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
deleted file mode 100644
index 9ec4644..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
-import org.junit.Test;
-
-import java.util.List;
-import java.util.Arrays;
-
-import static org.junit.Assert.*;
-
-public class DeltaPolicyTest {
-
- //Dummy serializer, this is not used because the tests are done locally
- private final static TypeSerializer<Tuple2<Integer, Integer>> SERIALIZER = null;
-
- @SuppressWarnings({ "serial", "unchecked", "rawtypes" })
- @Test
- public void testDelta() {
- DeltaPolicy deltaPolicy = new DeltaPolicy(new DeltaFunction<Tuple2<Integer, Integer>>() {
- @Override
- public double getDelta(Tuple2<Integer, Integer> oldDataPoint,
- Tuple2<Integer, Integer> newDataPoint) {
- return (double) newDataPoint.f0 - oldDataPoint.f0;
- }
- }, new Tuple2(0, 0), 2, SERIALIZER);
-
- List<Tuple2> tuples = Arrays.asList(new Tuple2(1, 0), new Tuple2(2, 0), new Tuple2(3, 0),
- new Tuple2(6, 0));
-
- assertFalse(deltaPolicy.notifyTrigger(tuples.get(0)));
- assertEquals(0, deltaPolicy.notifyEviction(tuples.get(0), false, 0));
-
- assertFalse(deltaPolicy.notifyTrigger(tuples.get(1)));
- assertEquals(0, deltaPolicy.notifyEviction(tuples.get(1), false, 1));
-
- assertTrue(deltaPolicy.notifyTrigger(tuples.get(2)));
- assertEquals(1, deltaPolicy.notifyEviction(tuples.get(2), true, 2));
-
- assertTrue(deltaPolicy.notifyTrigger(tuples.get(3)));
- assertEquals(2, deltaPolicy.notifyEviction(tuples.get(3), true, 2));
- }
-
- @Test
- public void testEquality() {
-
- DeltaFunction<Tuple2<Integer, Integer>> df = new DeltaFunction<Tuple2<Integer, Integer>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public double getDelta(Tuple2<Integer, Integer> oldDataPoint,
- Tuple2<Integer, Integer> newDataPoint) {
- return (double) newDataPoint.f0 - oldDataPoint.f0;
- }
- };
-
- assertEquals(new DeltaPolicy<Tuple2<Integer, Integer>>(df, new Tuple2<Integer, Integer>(0,
- 0), 2, SERIALIZER), new DeltaPolicy<Tuple2<Integer, Integer>>(df, new Tuple2<Integer, Integer>(
- 0, 0), 2, SERIALIZER));
-
- assertNotEquals(new DeltaPolicy<Tuple2<Integer, Integer>>(df, new Tuple2<Integer, Integer>(
- 0, 1), 2, SERIALIZER), new DeltaPolicy<Tuple2<Integer, Integer>>(df,
- new Tuple2<Integer, Integer>(0, 0), 2, SERIALIZER));
-
- assertNotEquals(new DeltaPolicy<Tuple2<Integer, Integer>>(df, new Tuple2<Integer, Integer>(0,
- 0), 2, SERIALIZER), new DeltaPolicy<Tuple2<Integer, Integer>>(df, new Tuple2<Integer, Integer>(
- 0, 0), 3, SERIALIZER));
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicyTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicyTest.java
deleted file mode 100644
index 3214aa7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicyTest.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.junit.Test;
-
-public class MultiEvictionPolicyTest {
-
- private final List<Integer> tuples = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
-
- private final CloneableEvictionPolicy<Integer> evictionPolicy1 = new CountEvictionPolicy<Integer>(
- 7, 3);
- private final CloneableEvictionPolicy<Integer> evictionPolicy2 = new CountEvictionPolicy<Integer>(
- 3, 1);
- private final CloneableEvictionPolicy<Integer> evictionPolicy3 = new CountEvictionPolicy<Integer>(
- 5, 2);
-
- private final CloneableEvictionPolicy<Integer> activeEvictionPolicy1 = new ActiveCloneableEvictionPolicyWrapper<Integer>(
- evictionPolicy1);
- private final CloneableEvictionPolicy<Integer> activeEvictionPolicy2 = new ActiveCloneableEvictionPolicyWrapper<Integer>(
- evictionPolicy2);
- private final CloneableEvictionPolicy<Integer> activeEvictionPolicy3 = new ActiveCloneableEvictionPolicyWrapper<Integer>(
- evictionPolicy3);
-
- // From policies specified above the expected output is:
- // 1.: 0000000300
- // 2.: 0001111111
- // 3.: 0000020202
- private final Integer[] maxResult = { 0, 0, 0, 1, 1, 2, 1, 3, 1, 2 };
- private final Integer[] minResult = { 0, 0, 0, 0, 0, 0, 0, 1, 0, 0 };
- private final Integer[] sumResult = { 0, 0, 0, 1, 1, 3, 1, 6, 1, 3 };
- private final Integer[] priorityResult = { 0, 0, 0, 1, 1, 1, 1, 3, 1, 1 };
-
- /*
- * Test cases for not active policies
- */
-
- @Test
- public void notActiveEvictionMAXStrategyTest() {
- runNotActiveEvictionTest(MultiEvictionPolicy.EvictionStrategy.MAX, maxResult);
- }
-
- @Test
- public void notActiveEvictionMINStrategyTest() {
- runNotActiveEvictionTest(MultiEvictionPolicy.EvictionStrategy.MIN, minResult);
- }
-
- @Test
- public void notActiveEvictionSUMStrategyTest() {
- runNotActiveEvictionTest(MultiEvictionPolicy.EvictionStrategy.SUM, sumResult);
- }
-
- @Test
- public void notActiveEvictionPRIORITYStrategyTest() {
- runNotActiveEvictionTest(MultiEvictionPolicy.EvictionStrategy.PRIORITY, priorityResult);
- }
-
- /*
- * Test cases for active policies
- */
-
- @Test
- public void activeEvictionMAXStrategyTest() {
- runActiveEvictionTest(MultiEvictionPolicy.EvictionStrategy.MAX, maxResult);
- }
-
- @Test
- public void activeEvictionMINStrategyTest() {
- runActiveEvictionTest(MultiEvictionPolicy.EvictionStrategy.MIN, minResult);
- }
-
- @Test
- public void activeEvictionSUMStrategyTest() {
- runActiveEvictionTest(MultiEvictionPolicy.EvictionStrategy.SUM, sumResult);
- }
-
- @Test
- public void activeEvictionPRIORITYStrategyTest() {
- runActiveEvictionTest(MultiEvictionPolicy.EvictionStrategy.PRIORITY, priorityResult);
- }
-
- /**
- * Helper method: It runs the test with the given input using the not active
- * policies and applies the strategy defined in the parameter.
- *
- * @param strategy
- * the eviction strategy to be used
- * @param expectedResult
- * the result we expect
- */
- private void runNotActiveEvictionTest(MultiEvictionPolicy.EvictionStrategy strategy,
- Integer[] expectedResult) {
- @SuppressWarnings("unchecked")
- MultiEvictionPolicy<Integer> multiEviction = new MultiEvictionPolicy<Integer>(strategy,
- evictionPolicy1.clone(), evictionPolicy2.clone(), evictionPolicy3.clone());
-
- List<Integer> result = new LinkedList<Integer>();
-
- int buffersize = 0;
- for (Integer tuple : tuples) {
- // The buffer size should not matter, but we keep it for the case of
- // later policy changes.
- // The trigger does not matter. Always set it to false.
- int eviction = multiEviction.notifyEviction(tuple, false, buffersize);
- buffersize -= eviction;
- result.add(eviction);
-
- if (buffersize < 0) {
- buffersize = 0;
- }
-
- buffersize++;
- }
-
- arrayEqualityCheck(expectedResult, result.toArray());
- }
-
- /**
- * Helper method: It runs the test with the given input using the active
- * policies and applies the strategy defined in the parameter.
- *
- * @param strategy
- * the eviction strategy to be used
- * @param expectedResult
- * the result we expect
- */
- private void runActiveEvictionTest(MultiEvictionPolicy.EvictionStrategy strategy,
- Integer[] expectedResult) {
- @SuppressWarnings("unchecked")
- MultiEvictionPolicy<Integer> multiEviction = new MultiEvictionPolicy<Integer>(strategy,
- activeEvictionPolicy1.clone(), activeEvictionPolicy2.clone(),
- activeEvictionPolicy3.clone());
-
- List<Integer> result = new LinkedList<Integer>();
-
- int buffersize = 0;
- for (Integer tuple : tuples) {
- // The buffer size should not matter, but we keep it for the case of
- // later policy changes.
- // The trigger does not matter. Always set it to false.
- int eviction = multiEviction.notifyEvictionWithFakeElement(tuple, buffersize);
- buffersize -= eviction;
- result.add(eviction);
-
- if (buffersize < 0) {
- buffersize = 0;
- }
-
- buffersize++;
- }
-
- arrayEqualityCheck(expectedResult, result.toArray());
- }
-
- private void arrayEqualityCheck(Object[] array1, Object[] array2) {
- assertEquals(
- "The result arrays must have the same length. (Expected: " + Arrays.asList(array1)
- + "; Actual: " + Arrays.asList(array2) + ")", array1.length, array2.length);
- for (int i = 0; i < array1.length; i++) {
- assertEquals("Unequal fields at position " + i + "(Expected: " + Arrays.asList(array1)
- + "; Actual: " + Arrays.asList(array2) + ")", array1[i], array2[i]);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java
deleted file mode 100644
index 4448b59..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import com.google.common.collect.Sets;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class MultiTriggerPolicyTest {
-
- /**
- * This constant defines the timeout for the test of the start ups of the
- * active trigger policy Threads.
- */
- private static final int TIMEOUT = 120000;
-
- // Use this to increase the timeout to be as long as possible.
- // private static final int TIMEOUT=Integer.MAX_VALUE;
-
- /**
- * This test covers all regular notify call. It takes no fake elements into
- * account.
- */
- @Test
- public void testWithoutActivePolicies() {
- List<Integer> tuples = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
- 16);
-
- TriggerPolicy<Integer> firstPolicy = new CountTriggerPolicy<Integer>(3);
- TriggerPolicy<Integer> secondPolicy = new CountTriggerPolicy<Integer>(5);
- TriggerPolicy<Integer> thirdPolicy = new CountTriggerPolicy<Integer>(8);
- @SuppressWarnings("unchecked")
- TriggerPolicy<Integer> multiTrigger = new MultiTriggerPolicy<Integer>(firstPolicy,
- secondPolicy, thirdPolicy);
-
- // From above policies the expected output is (first element is 0):
- // first: 3, 6, 9, 12, 15,...
- // second: 5, 10, 15,...
- // third: 8, 16, 24,...
- // combination: 3,5,6,8,9,10,12,15,16
- List<Integer> expectedResult = Arrays.asList(3, 5, 6, 8, 9, 10, 12, 15, 16);
- List<Integer> actualResult = new LinkedList<Integer>();
-
- for (int i = 0; i < tuples.size(); i++) {
- if (multiTrigger.notifyTrigger(tuples.get(i))) {
- actualResult.add(i);
- }
- }
-
- // check equal sizes
- assertTrue("The expected result list and the actual result list must have the same size,"
- + " but they are different. (expected: " + expectedResult.size() + "; actual: "
- + actualResult.size() + "). Actual result is: " + actualResult
- + " Expected result is: " + expectedResult,
- expectedResult.size() == actualResult.size());
-
- // check equal elements within result list/expected list
- for (int i = 0; i < expectedResult.size(); i++) {
- assertTrue("The actual and the expected result does not match at position " + i
- + ". (expected: " + expectedResult.get(i) + "; actual: " + actualResult.get(i)
- + "). Actual result is: " + actualResult + " Expected result is: "
- + expectedResult, expectedResult.get(i) == actualResult.get(i));
- }
- }
-
- /**
- * This test covers the pre-notify calls to active policies. I takes no
- * regular notify into account.
- */
- @Test
- public void testWithActivePolicies() {
-
- // create some test data
- Integer[] times = { 1, 3, 20, 26 };
-
- // create a timestamp
- @SuppressWarnings("serial")
- Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
-
- @Override
- public long getTimestamp(Integer value) {
- return value;
- }
-
- };
-
- // create policy
- TimeTriggerPolicy<Integer> firstPolicy = new TimeTriggerPolicy<Integer>(5,
- new TimestampWrapper<Integer>(timeStamp, 0));
- TimeTriggerPolicy<Integer> secondPolicy = new TimeTriggerPolicy<Integer>(10,
- new TimestampWrapper<Integer>(timeStamp, 0));
- TimeTriggerPolicy<Integer> thirdPolicy = new TimeTriggerPolicy<Integer>(22,
- new TimestampWrapper<Integer>(timeStamp, 0));
- @SuppressWarnings("unchecked")
- MultiTriggerPolicy<Integer> multiTrigger = new MultiTriggerPolicy<Integer>(firstPolicy,
- secondPolicy, thirdPolicy);
-
- // expected result
- // Long[][] result1 = { {}, {}, { 4L, 9L, 14L, 19L }, { 24L } };
- // Long[][] result2 = { {}, {}, { 9L, 19L }, { } };
- // Long[][] result3 = { {}, {}, { }, { 21L } };
- Long[][] result = { {}, {}, { 4L, 9L, 14L, 19L, 9L, 19L }, { 24L, 21L } };
-
- // call policy
- for (int i = 0; i < times.length; i++) {
- arrayEqualityCheck(result[i], multiTrigger.preNotifyTrigger(times[i]));
- multiTrigger.notifyTrigger(times[i]);
- }
- }
-
- /**
- * This test verifies, that nestet active trigger runnables are started
- * correctly.
- */
- @Test
- public void testActiveTriggerRunnables() throws InterruptedException {
- TriggerPolicy<Integer> firstPolicy = new ActiveTriggerWithRunnable(1);
- TriggerPolicy<Integer> secondPolicy = new ActiveTriggerWithRunnable(2);
- TriggerPolicy<Integer> thirdPolicy = new ActiveTriggerWithRunnable(3);
- @SuppressWarnings("unchecked")
- ActiveTriggerPolicy<Integer> multiTrigger = new MultiTriggerPolicy<Integer>(firstPolicy,
- secondPolicy, thirdPolicy);
-
- MyCallbackClass cb = new MyCallbackClass(3);
- Runnable runnable = multiTrigger.createActiveTriggerRunnable(cb);
- new Thread(runnable).start();
-
- assertTrue("Even after " + TIMEOUT + "ms not all active policy runnables were started.",
- cb.check(TIMEOUT, 1, 2, 3));
- }
-
- private void arrayEqualityCheck(Object[] array1, Object[] array2) {
- assertEquals(
- "The result arrays must have the same length. (Expected: " + Arrays.asList(array1)
- + "; Actual: " + Arrays.asList(array2) + ")", array1.length, array2.length);
- for (int i = 0; i < array1.length; i++) {
- assertEquals("Unequal fields at position " + i + "(Expected: " + Arrays.asList(array1)
- + "; Actual: " + Arrays.asList(array2) + ")", array1[i], array2[i]);
- }
- }
-
- /**
- * This helper class is used to simulate active triggers which produce own
- * runnables.
- */
- @SuppressWarnings("serial")
- private class ActiveTriggerWithRunnable implements ActiveTriggerPolicy<Integer> {
-
- private final int id;
-
- public ActiveTriggerWithRunnable(int id) {
- this.id = id;
- }
-
- @Override
- public boolean notifyTrigger(Integer datapoint) {
- // This method is not uses for any test case
- return false;
- }
-
- @Override
- public Object[] preNotifyTrigger(Integer datapoint) {
- // This method is not used for any test case
- return null;
- }
-
- @Override
- public Runnable createActiveTriggerRunnable(final ActiveTriggerCallback callback) {
- return new Runnable() {
- @Override
- public void run() {
- callback.sendFakeElement(id);
- }
- };
- }
- }
-
- /**
- * This callback class is used to checked whether all nested policy runnable
- * started up.
- */
- private class MyCallbackClass implements ActiveTriggerCallback {
-
- private final Set<Integer> received = Sets
- .newSetFromMap(new ConcurrentHashMap<Integer, Boolean>());
-
- private final CountDownLatch sync;
-
- public MyCallbackClass(int numberOfExpectedElements) {
- checkArgument(numberOfExpectedElements >= 0);
- this.sync = new CountDownLatch(numberOfExpectedElements);
- }
-
- @Override
- public void sendFakeElement(Object datapoint) {
- received.add((Integer) datapoint);
-
- sync.countDown();
- }
-
- public boolean check(int timeout, int... expectedIds) throws InterruptedException {
- // Wait for all elements
- sync.await(timeout, TimeUnit.MILLISECONDS);
-
- // Check received all expected ids
- assertEquals(expectedIds.length, received.size());
-
- for (int id : expectedIds) {
- if (!received.contains(id)) {
- return false;
- }
- }
-
- return true;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicyTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicyTest.java
deleted file mode 100644
index fda9cd3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicyTest.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
-import org.apache.flink.streaming.api.windowing.extractor.FieldFromTuple;
-import org.junit.Test;
-
-public class PunctuationPolicyTest {
-
- // This value should not effect the policy. It is changed at each call to
- // verify this.
- private boolean triggered = false;
-
- @Test
- public void PunctuationTriggerTestWithoutExtraction() {
- PunctuationPolicy<Object, Object> policy = new PunctuationPolicy<Object, Object>(
- new TestObject(0));
- assertTrue("The present punctuation was not detected. (POS 1)",
- policy.notifyTrigger(new TestObject(0)));
- assertFalse("There was a punctuation detected which wasn't present. (POS 2)",
- policy.notifyTrigger(new TestObject(1)));
- policy.toString();
- }
-
- @Test
- public void PunctuationTriggerTestWithExtraction() {
- @SuppressWarnings({ "unchecked", "rawtypes" })
- PunctuationPolicy<Tuple2<Object, Object>, Object> policy = new PunctuationPolicy<Tuple2<Object, Object>, Object>(
- new TestObject(0), new FieldFromTuple(0));
- assertTrue("The present punctuation was not detected. (POS 3)",
- policy.notifyTrigger(new Tuple2<Object, Object>(new TestObject(0),
- new TestObject(1))));
- assertFalse("There was a punctuation detected which wasn't present. (POS 4)",
- policy.notifyTrigger(new Tuple2<Object, Object>(new TestObject(1),
- new TestObject(0))));
- }
-
- @Test
- public void PunctuationEvictionTestWithoutExtraction() {
- // The current buffer size should not effect the test. It's therefore
- // always 0 here.
-
- PunctuationPolicy<Object, Object> policy = new PunctuationPolicy<Object, Object>(
- new TestObject(0));
- assertEquals(
- "The present punctuation was not detected or the number of deleted tuples was wrong. (POS 5)",
- 0, policy.notifyEviction(new TestObject(0), (triggered = !triggered), 0));
- for (int i = 0; i < 10; i++) {
- for (int j = 0; j < i; j++) {
- assertEquals("There was a punctuation detected which wasn't present. (POS 6)", 0,
- policy.notifyEviction(new TestObject(1), (triggered = !triggered), 0));
- }
- assertEquals(
- "The present punctuation was not detected or the number of deleted tuples was wrong. (POS 7)",
- i + 1, policy.notifyEviction(new TestObject(0), (triggered = !triggered), 0));
- }
- }
-
- @Test
- public void PunctuationEvictionTestWithExtraction() {
- // The current buffer size should not effect the test. It's therefore
- // always 0 here.
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- PunctuationPolicy<Tuple2<Object, Object>, Object> policy = new PunctuationPolicy<Tuple2<Object, Object>, Object>(
- new TestObject(0), new FieldFromTuple(0));
- assertEquals(
- "The present punctuation was not detected or the number of deleted tuples was wrong. (POS 10)",
- 0, policy.notifyEviction(new Tuple2<Object, Object>(new TestObject(0),
- new TestObject(1)), (triggered = !triggered), 0));
- for (int i = 0; i < 10; i++) {
- for (int j = 0; j < i; j++) {
- assertEquals("There was a punctuation detected which wasn't present. (POS 9)", 0,
- policy.notifyEviction(new Tuple2<Object, Object>(new TestObject(1),
- new TestObject(0)), (triggered = !triggered), 0));
- }
- assertEquals(
- "The present punctuation was not detected or the number of deleted tuples was wrong. (POS 10)",
- i + 1, policy.notifyEviction(new Tuple2<Object, Object>(new TestObject(0),
- new TestObject(1)), (triggered = !triggered), 0));
- }
- }
-
- @Test
- public void testEquals() {
- Extractor<Integer, Integer> extractor = new Extractor<Integer, Integer>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer extract(Integer in) {
- return in;
- }
- };
-
- assertEquals(new PunctuationPolicy<Integer, Integer>(4),
- new PunctuationPolicy<Integer, Integer>(4));
- assertNotEquals(new PunctuationPolicy<Integer, Integer>(4),
- new PunctuationPolicy<Integer, Integer>(5));
-
- assertNotEquals(new PunctuationPolicy<Integer, Integer>(4, extractor),
- new PunctuationPolicy<Integer, Integer>(4));
-
- assertEquals(new PunctuationPolicy<Integer, Integer>(4, extractor),
- new PunctuationPolicy<Integer, Integer>(4, extractor));
-
- assertNotEquals(new PunctuationPolicy<Integer, Integer>(4),
- new PunctuationPolicy<Integer, Integer>(4, extractor));
-
- }
-
- private class TestObject {
-
- private int id;
-
- public TestObject(int id) {
- this.id = id;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof TestObject && ((TestObject) o).getId() == this.id) {
- return true;
- } else {
- return false;
- }
- }
-
- public int getId() {
- return id;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java
deleted file mode 100644
index 08a5c32..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.LinkedList;
-
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.junit.Test;
-
-public class TimeEvictionPolicyTest {
-
- @Test
- public void timeEvictionTest() {
- // create some test data
- Integer[] times = { 1, 3, 4, 6, 7, 9, 14, 20, 21, 22, 30, 31, 33, 36, 40, 41, 42, 43, 44,
- 45, 47, 55 };
- Integer[] numToDelete = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 2, 0, 0, 0, 0, 3 };
-
- // create a timestamp
- @SuppressWarnings("serial")
- Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
-
- @Override
- public long getTimestamp(Integer value) {
- return value;
- }
-
- };
-
- // test different granularity
- for (long granularity = 0; granularity < 40; granularity++) {
- // create policy
- TimeEvictionPolicy<Integer> policy = new TimeEvictionPolicy<Integer>(granularity,
- new TimestampWrapper<Integer>(timeStamp, 0));
-
- // The trigger status should not effect the policy. Therefore, it's
- // value is changed after each usage.
- boolean triggered = false;
-
- // The eviction should work similar with both, fake and real
- // elements. Which kind is used is changed on every 3rd element in
- // this test.
- int fakeAndRealCounter = 0;
- boolean fake = false;
-
- // test by adding values
- LinkedList<Integer> buffer = new LinkedList<Integer>();
- for (int i = 0; i < times.length; i++) {
-
- // check if the current element should be a fake
- fakeAndRealCounter++;
- if (fakeAndRealCounter > 2) {
- fake = !fake;
- fakeAndRealCounter = 0;
- }
-
- int result;
-
- if (fake) {
- // Notify eviction with fake element
- result = policy.notifyEvictionWithFakeElement(times[i], buffer.size());
- } else {
- // Notify eviction with real element
- result = policy.notifyEviction(times[i], (triggered = !triggered),
- buffer.size());
- }
-
- // handle correctness of eviction
- for (; result > 0 && !buffer.isEmpty(); result--) {
- if (buffer.getFirst() <= times[i] - granularity) {
- buffer.removeFirst();
- } else {
- fail("The policy wanted to evict time " + buffer.getFirst()
- + " while the current time was " + times[i]
- + "and the granularity was " + granularity);
- }
- }
-
- // test that all required evictions have been done
- if (!buffer.isEmpty()) {
- assertTrue("The policy did not evict " + buffer.getFirst()
- + " while the current time was " + times[i]
- + " and the granularity was " + granularity,
- (buffer.getFirst() >= times[i] - granularity));
- }
-
- // test influence of other evictions
- for (int j = numToDelete[i % numToDelete.length]; j > 0; j--) {
- if (!buffer.isEmpty()) {
- buffer.removeFirst();
- }
- }
-
- // add current element to buffer if it is no fake
- if (!fake) {
- buffer.add(times[i]);
- }
-
- }
- }
- }
-
- @Test
- public void equalsTest() {
-
- @SuppressWarnings("serial")
- Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
-
- @Override
- public long getTimestamp(Integer value) {
- return value;
- }
-
- };
-
- @SuppressWarnings("serial")
- Timestamp<Integer> timeStamp2 = new Timestamp<Integer>() {
-
- @Override
- public long getTimestamp(Integer value) {
- return value;
- }
-
- };
-
- assertEquals(
- new TimeEvictionPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp, 0)),
- new TimeEvictionPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp, 0)));
-
- assertNotEquals(new TimeEvictionPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp,
- 0)), new TimeEvictionPolicy<Integer>(5,
- new TimestampWrapper<Integer>(timeStamp2, 0)));
-
- assertNotEquals(new TimeEvictionPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp,
- 0)),
- new TimeEvictionPolicy<Integer>(2, new TimestampWrapper<Integer>(timeStamp, 0)));
-
- assertNotEquals(new TimeEvictionPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp,
- 0)),
- new TimeEvictionPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp, 3)));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
deleted file mode 100644
index 5b26854..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.junit.Test;
-
-public class TimeTriggerPolicyTest {
-
- @Test
- public void timeTriggerRegularNotifyTest() {
- // create some test data
- Integer[] times = { 1, 3, 4, 6, 7, 9, 14, 20, 21, 22, 30 };
-
- // create a timestamp
- @SuppressWarnings("serial")
- Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
-
- @Override
- public long getTimestamp(Integer value) {
- return value;
- }
-
- };
-
- // test different granularity
- for (long granularity = 0; granularity < 31; granularity++) {
- // create policy
-
- TriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(granularity,
- new TimestampWrapper<Integer>(timeStamp, 0));
-
- // remember window border
- long currentTime = 0;
-
- // test by adding values
- for (int i = 0; i < times.length; i++) {
- boolean result = policy.notifyTrigger(times[i]);
- // start time is included, but end time is excluded: >=
- if (times[i] >= currentTime + granularity) {
- if (granularity != 0) {
- currentTime = times[i] - ((times[i] - currentTime) % granularity);
- }
- assertTrue("The policy did not trigger at pos " + i + " (current time border: "
- + currentTime + "; current granularity: " + granularity
- + "; data point time: " + times[i] + ")", result);
- } else {
- assertFalse("The policy triggered wrong at pos " + i
- + " (current time border: " + currentTime + "; current granularity: "
- + granularity + "; data point time: " + times[i] + ")", result);
- }
- }
- }
-
- }
-
- @Test
- public void equalsTest() {
-
- @SuppressWarnings("serial")
- Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
-
- @Override
- public long getTimestamp(Integer value) {
- return value;
- }
-
- };
-
- @SuppressWarnings("serial")
- Timestamp<Integer> timeStamp2 = new Timestamp<Integer>() {
-
- @Override
- public long getTimestamp(Integer value) {
- return value;
- }
-
- };
-
- assertEquals(
- new TimeTriggerPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp, 0)),
- new TimeTriggerPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp, 0)));
-
- assertNotEquals(new TimeTriggerPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp,
- 0)),
- new TimeTriggerPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp2, 0)));
-
- assertNotEquals(new TimeTriggerPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp,
- 0)), new TimeTriggerPolicy<Integer>(2, new TimestampWrapper<Integer>(timeStamp, 0)));
-
- assertNotEquals(new TimeTriggerPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp,
- 0)), new TimeTriggerPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp, 3)));
-
- assertEquals(SystemTimestamp.getWrapper(), SystemTimestamp.getWrapper());
- }
-
- @Test
- public void timeTriggerPreNotifyTest() {
- // create some test data
- Integer[] times = { 1, 3, 20, 26 };
-
- // create a timestamp
- @SuppressWarnings("serial")
- Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
-
- @Override
- public long getTimestamp(Integer value) {
- return value;
- }
-
- };
-
- // create policy
- TimeTriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(5,
- new TimestampWrapper<Integer>(timeStamp, 0));
-
- // expected result
- Long[][] result = { {}, {}, { 4L, 9L, 14L, 19L }, { 24L } };
-
- // call policy
- for (int i = 0; i < times.length; i++) {
- arrayEqualityCheck(result[i], policy.preNotifyTrigger(times[i]));
- policy.notifyTrigger(times[i]);
- }
- }
-
- private void arrayEqualityCheck(Object[] array1, Object[] array2) {
- assertEquals("The result arrays must have the same length", array1.length, array2.length);
- for (int i = 0; i < array1.length; i++) {
- assertEquals("Unequal fields at position " + i, array1[i], array2[i]);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicyTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicyTest.java
deleted file mode 100644
index a3a7d73..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicyTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import static org.junit.Assert.*;
-
-import org.junit.Test;
-
-public class TumblingEvictionPolicyTest {
-
- @Test
- public void testTumblingEviction() {
- EvictionPolicy<Integer> policy = new TumblingEvictionPolicy<Integer>();
-
- int counter = 0;
-
- for (int i = 0; i < 10; i++) {
- for (int j = 0; j < i; j++) {
- assertEquals(0, policy.notifyEviction(0, false, counter++));
- }
- assertEquals(counter, policy.notifyEviction(0, true, counter));
- counter = 1;
- }
-
- assertEquals(new TumblingEvictionPolicy<Integer>(), new TumblingEvictionPolicy<Integer>());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
deleted file mode 100644
index 6bc0e30..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.junit.Test;
-
-public class BasicWindowBufferTest {
-
- @Test
- public void testEmitWindow() throws Exception {
-
- TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
- List<StreamWindow<Integer>> collected = collector.getCollected();
-
- WindowBuffer<Integer> wb = new BasicWindowBuffer<Integer>();
-
- wb.store(2);
- wb.store(10);
-
- wb.emitWindow(collector);
-
- assertEquals(1, collected.size());
- assertEquals(StreamWindow.fromElements(2, 10), collected.get(0));
-
- wb.store(4);
- wb.evict(2);
-
- wb.emitWindow(collector);
-
- assertEquals(2, collected.size());
- assertEquals(StreamWindow.fromElements(4), collected.get(1));
-
- wb.evict(1);
-
- wb.emitWindow(collector);
- assertEquals(2, collected.size());
- }
-
- public static class TestOutput<T> implements Output<StreamRecord<T>> {
-
- private final List<T> collected = new ArrayList<T>();
-
- @Override
- public void collect(StreamRecord<T> record) {
- collected.add(record.getValue());
- }
-
- @Override
- public void close() {
- }
-
- public List<T> getCollected() {
- return collected;
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
-
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java
deleted file mode 100644
index 8430499..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.Keys;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
-
-import org.junit.Test;
-
-public class JumpingCountGroupedPreReducerTest {
-
- TypeInformation<Tuple2<Integer, Integer>> type = TypeExtractor
- .getForObject(new Tuple2<Integer, Integer>(1, 1));
- TypeSerializer<Tuple2<Integer, Integer>> serializer = type.createSerializer(null);
-
- KeySelector<Tuple2<Integer, Integer>, ?> key = KeySelectorUtil.getSelectorForKeys(
- new Keys.ExpressionKeys<Tuple2<Integer, Integer>>(new int[] { 0 }, type), type, null);
-
- Reducer reducer = new Reducer();
-
- @SuppressWarnings("unchecked")
- @Test
- public void testEmitWindow() throws Exception {
-
- List<Tuple2<Integer, Integer>> inputs = new ArrayList<Tuple2<Integer, Integer>>();
- inputs.add(new Tuple2<Integer, Integer>(1, 1));
- inputs.add(new Tuple2<Integer, Integer>(0, 0));
- inputs.add(new Tuple2<Integer, Integer>(1, -1));
- inputs.add(new Tuple2<Integer, Integer>(1, -2));
- inputs.add(new Tuple2<Integer, Integer>(100, -200));
-
- TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
- List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
-
- WindowBuffer<Tuple2<Integer, Integer>> wb = new JumpingCountGroupedPreReducer<Tuple2<Integer, Integer>>(
- reducer, key, serializer, 1);
-
- wb.store(serializer.copy(inputs.get(4)));
- wb.store(serializer.copy(inputs.get(0)));
- wb.store(serializer.copy(inputs.get(1)));
- wb.emitWindow(collector);
-
- assertEquals(1, collected.size());
-
- assertSetEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(1, 1),
- new Tuple2<Integer, Integer>(0, 0)), collected.get(0));
-
- wb.store(serializer.copy(inputs.get(4)));
- wb.store(serializer.copy(inputs.get(0)));
- wb.store(serializer.copy(inputs.get(1)));
- wb.store(serializer.copy(inputs.get(2)));
-
- // Nothing should happen here
- wb.evict(3);
-
- wb.store(serializer.copy(inputs.get(3)));
-
- wb.emitWindow(collector);
-
- assertEquals(2, collected.size());
-
- assertSetEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(3, -2),
- new Tuple2<Integer, Integer>(0, 0)), collected.get(1));
-
- // Test whether function is mutating inputs or not
- assertEquals(2, reducer.allInputs.size());
- assertEquals(reducer.allInputs.get(0), inputs.get(2));
- assertEquals(reducer.allInputs.get(1), inputs.get(3));
-
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testEmitWindow2() throws Exception {
-
- List<Tuple2<Integer, Integer>> inputs = new ArrayList<Tuple2<Integer, Integer>>();
- inputs.add(new Tuple2<Integer, Integer>(1, 1));
- inputs.add(new Tuple2<Integer, Integer>(0, 0));
- inputs.add(new Tuple2<Integer, Integer>(1, -1));
- inputs.add(new Tuple2<Integer, Integer>(1, -2));
- inputs.add(new Tuple2<Integer, Integer>(100, -200));
-
- TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
- List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
-
- WindowBuffer<Tuple2<Integer, Integer>> wb = new JumpingCountGroupedPreReducer<Tuple2<Integer, Integer>>(
- reducer, key, serializer, 1).sequentialID();
-
- wb.store(serializer.copy(inputs.get(4)));
- wb.store(serializer.copy(inputs.get(0)));
- wb.store(serializer.copy(inputs.get(1)));
- wb.emitWindow(collector);
-
- assertSetEquals(StreamWindow.fromElements(inputs.get(0), inputs.get(1)), collected.get(0));
-
- wb.store(serializer.copy(inputs.get(4)));
- wb.store(serializer.copy(inputs.get(0)));
- wb.store(serializer.copy(inputs.get(1)));
- wb.store(serializer.copy(inputs.get(2)));
- wb.emitWindow(collector);
-
- assertSetEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(2, 0), inputs.get(1)), collected.get(1));
-
-
- }
-
- private static <T> void assertSetEquals(Collection<T> first, Collection<T> second) {
- assertEquals(new HashSet<T>(first), new HashSet<T>(second));
- }
-
- @SuppressWarnings("serial")
- private class Reducer implements ReduceFunction<Tuple2<Integer, Integer>> {
-
- public List<Tuple2<Integer, Integer>> allInputs = new ArrayList<Tuple2<Integer, Integer>>();
-
- @Override
- public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1,
- Tuple2<Integer, Integer> value2) throws Exception {
- allInputs.add(value2);
- value1.f0 = value1.f0 + value2.f0;
- value1.f1 = value1.f1 + value2.f1;
- return value1;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java
deleted file mode 100644
index 2279264..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
-import org.junit.Test;
-
-public class JumpingCountPreReducerTest {
-
- TypeSerializer<Tuple2<Integer, Integer>> serializer = TypeExtractor.getForObject(
- new Tuple2<Integer, Integer>(1, 1)).createSerializer(null);
-
- Reducer reducer = new Reducer();
-
- @SuppressWarnings("unchecked")
- @Test
- public void testEmitWindow() throws Exception {
-
- List<Tuple2<Integer, Integer>> inputs = new ArrayList<Tuple2<Integer, Integer>>();
- inputs.add(new Tuple2<Integer, Integer>(1, 1));
- inputs.add(new Tuple2<Integer, Integer>(2, 0));
- inputs.add(new Tuple2<Integer, Integer>(3, -1));
- inputs.add(new Tuple2<Integer, Integer>(4, -2));
- inputs.add(new Tuple2<Integer, Integer>(5, -3));
-
- TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
- List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
-
- WindowBuffer<Tuple2<Integer, Integer>> wb = new JumpingCountPreReducer<Tuple2<Integer, Integer>>(
- reducer, serializer, 2);
-
- wb.store(serializer.copy(inputs.get(0)));
- wb.store(serializer.copy(inputs.get(1)));
- wb.store(serializer.copy(inputs.get(2)));
- wb.store(serializer.copy(inputs.get(3)));
- wb.store(serializer.copy(inputs.get(4)));
-
- wb.emitWindow(collector);
-
- assertEquals(1, collected.size());
- assertEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(12, -6)),
- collected.get(0));
-
- wb.store(serializer.copy(inputs.get(0)));
- wb.store(serializer.copy(inputs.get(1)));
- wb.store(serializer.copy(inputs.get(2)));
-
- // Nothing should happen here
- wb.evict(3);
-
- wb.store(serializer.copy(inputs.get(3)));
-
- wb.emitWindow(collector);
-
- assertEquals(2, collected.size());
- assertEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(7, -3)),
- collected.get(1));
-
- // Test whether function is mutating inputs or not
- assertEquals(3, reducer.allInputs.size());
- assertEquals(reducer.allInputs.get(0), inputs.get(3));
- assertEquals(reducer.allInputs.get(1), inputs.get(4));
- assertEquals(reducer.allInputs.get(2), inputs.get(3));
- }
-
- @SuppressWarnings("serial")
- private class Reducer implements ReduceFunction<Tuple2<Integer, Integer>> {
-
- public List<Tuple2<Integer, Integer>> allInputs = new ArrayList<Tuple2<Integer, Integer>>();
-
- @Override
- public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1,
- Tuple2<Integer, Integer> value2) throws Exception {
- allInputs.add(value2);
- value1.f0 = value1.f0 + value2.f0;
- value1.f1 = value1.f1 + value2.f1;
- return value1;
- }
-
- }
-
-}