You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/12 06:11:03 UTC
[2/5] flink git commit: [FLINK-6491] [table] Add QueryConfig and
state clean up for over-windowed aggregates.
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
index 910cbf2..9da2c44 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
@@ -18,11 +18,12 @@
package org.apache.flink.table.api.scala.stream.table
+import org.apache.flink.api.common.time.Time
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
import org.apache.flink.table.api.scala.stream.utils.StreamITCase.RetractingSink
import org.apache.flink.types.Row
import org.junit.Assert.assertEquals
@@ -34,6 +35,8 @@ import scala.collection.mutable
* Tests of groupby (without window) aggregations
*/
class GroupAggregationsITCase extends StreamingWithStateTestBase {
+ private val queryConfig = new StreamQueryConfig()
+ queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
@Test
def testNonKeyedGroupAggregate(): Unit = {
@@ -45,7 +48,7 @@ class GroupAggregationsITCase extends StreamingWithStateTestBase {
val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
.select('a.sum, 'b.sum)
- val results = t.toRetractStream[Row]
+ val results = t.toRetractStream[Row](queryConfig)
results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
env.execute()
@@ -64,7 +67,7 @@ class GroupAggregationsITCase extends StreamingWithStateTestBase {
.groupBy('b)
.select('b, 'a.sum)
- val results = t.toRetractStream[Row]
+ val results = t.toRetractStream[Row](queryConfig)
results.addSink(new StreamITCase.RetractingSink)
env.execute()
@@ -85,7 +88,7 @@ class GroupAggregationsITCase extends StreamingWithStateTestBase {
.groupBy('cnt)
.select('cnt, 'b.count as 'freq)
- val results = t.toRetractStream[Row]
+ val results = t.toRetractStream[Row](queryConfig)
results.addSink(new RetractingSink)
env.execute()
@@ -104,7 +107,7 @@ class GroupAggregationsITCase extends StreamingWithStateTestBase {
.groupBy('e, 'b % 3)
.select('c.min, 'e, 'a.avg, 'd.count)
- val results = t.toRetractStream[Row]
+ val results = t.toRetractStream[Row](queryConfig)
results.addSink(new RetractingSink)
env.execute()
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
deleted file mode 100644
index eadcfc8..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import java.util.Comparator
-import java.util.concurrent.ConcurrentLinkedQueue
-import java.lang.{Integer => JInt, Long => JLong}
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.streaming.api.operators.KeyedProcessOperator
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
-import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil}
-import org.apache.flink.table.codegen.GeneratedAggregationsFunction
-import org.apache.flink.table.functions.AggregateFunction
-import org.apache.flink.table.functions.aggfunctions.{LongMaxWithRetractAggFunction, LongMinWithRetractAggFunction}
-import org.apache.flink.table.runtime.aggregate.BoundedProcessingOverRangeProcessFunctionTest._
-import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
-import org.apache.flink.types.Row
-import org.junit.Test
-
-class BoundedProcessingOverRangeProcessFunctionTest {
-
- @Test
- def testProcTimePartitionedOverRange(): Unit = {
-
- val rT = new CRowTypeInfo(new RowTypeInfo(Array[TypeInformation[_]](
- INT_TYPE_INFO,
- LONG_TYPE_INFO,
- INT_TYPE_INFO,
- STRING_TYPE_INFO,
- LONG_TYPE_INFO),
- Array("a", "b", "c", "d", "e")))
-
- val aggregates =
- Array(new LongMinWithRetractAggFunction,
- new LongMaxWithRetractAggFunction).asInstanceOf[Array[AggregateFunction[_, _]]]
- val aggregationStateType: RowTypeInfo = AggregateUtil.createAccumulatorRowType(aggregates)
-
- val funcCode =
- """
- |public class BoundedOverAggregateHelper$33
- | extends org.apache.flink.table.runtime.aggregate.GeneratedAggregations {
- |
- | transient org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction
- | fmin = null;
- |
- | transient org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction
- | fmax = null;
- |
- | public BoundedOverAggregateHelper$33() throws Exception {
- |
- | fmin = (org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction)
- | org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
- | .deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn" +
- | "MuTG9uZ01pbldpdGhSZXRyYWN0QWdnRnVuY3Rpb26oIdX_DaMPxQIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL" +
- | "nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWluV2l0aFJldHJhY3RBZ2dGdW5jdGlvbq_ZGuzxtA_S" +
- | "AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV" +
- | "uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS" +
- | "5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc" +
- | "mluZyRMb25nJOda0iCPo2ukAgAAeHA");
- |
- | fmax = (org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction)
- | org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
- | .deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn" +
- | "MuTG9uZ01heFdpdGhSZXRyYWN0QWdnRnVuY3Rpb25RmsI8azNGXwIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL" +
- | "nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWF4V2l0aFJldHJhY3RBZ2dGdW5jdGlvbvnwowlX0_Qf" +
- | "AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV" +
- | "uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS" +
- | "5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc" +
- | "mluZyRMb25nJOda0iCPo2ukAgAAeHA");
- | }
- |
- | public void setAggregationResults(
- | org.apache.flink.types.Row accs,
- | org.apache.flink.types.Row output) {
- |
- | org.apache.flink.table.functions.AggregateFunction baseClass0 =
- | (org.apache.flink.table.functions.AggregateFunction) fmin;
- | output.setField(5, baseClass0.getValue(
- | (org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
- | accs.getField(0)));
- |
- | org.apache.flink.table.functions.AggregateFunction baseClass1 =
- | (org.apache.flink.table.functions.AggregateFunction) fmax;
- | output.setField(6, baseClass1.getValue(
- | (org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
- | accs.getField(1)));
- | }
- |
- | public void accumulate(
- | org.apache.flink.types.Row accs,
- | org.apache.flink.types.Row input) {
- |
- | fmin.accumulate(
- | ((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
- | accs.getField(0)),
- | (java.lang.Long) input.getField(4));
- |
- | fmax.accumulate(
- | ((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
- | accs.getField(1)),
- | (java.lang.Long) input.getField(4));
- | }
- |
- | public void retract(
- | org.apache.flink.types.Row accs,
- | org.apache.flink.types.Row input) {
- |
- | fmin.retract(
- | ((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
- | accs.getField(0)),
- | (java.lang.Long) input.getField(4));
- |
- | fmax.retract(
- | ((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
- | accs.getField(1)),
- | (java.lang.Long) input.getField(4));
- | }
- |
- | public org.apache.flink.types.Row createAccumulators() {
- |
- | org.apache.flink.types.Row accs = new org.apache.flink.types.Row(2);
- |
- | accs.setField(
- | 0,
- | fmin.createAccumulator());
- |
- | accs.setField(
- | 1,
- | fmax.createAccumulator());
- |
- | return accs;
- | }
- |
- | public void setForwardedFields(
- | org.apache.flink.types.Row input,
- | org.apache.flink.types.Row output) {
- |
- | output.setField(0, input.getField(0));
- | output.setField(1, input.getField(1));
- | output.setField(2, input.getField(2));
- | output.setField(3, input.getField(3));
- | output.setField(4, input.getField(4));
- | }
- |
- | public org.apache.flink.types.Row createOutputRow() {
- | return new org.apache.flink.types.Row(7);
- | }
- |
- |/******* This test does not use the following methods *******/
- | public org.apache.flink.types.Row mergeAccumulatorsPair(
- | org.apache.flink.types.Row a,
- | org.apache.flink.types.Row b) {
- | return null;
- | }
- |
- | public void resetAccumulator(org.apache.flink.types.Row accs) {
- | }
- |
- | public void setConstantFlags(org.apache.flink.types.Row output) {
- | }
- |}
- """.stripMargin
-
- val funcName = "BoundedOverAggregateHelper$33"
-
- val genAggFunction = GeneratedAggregationsFunction(funcName, funcCode)
- val processFunction = new KeyedProcessOperator[String, CRow, CRow](
- new ProcTimeBoundedRangeOver(
- genAggFunction,
- 1000,
- aggregationStateType,
- rT))
-
- val testHarness = new KeyedOneInputStreamOperatorTestHarness[JInt, CRow, CRow](
- processFunction,
- new TupleRowSelector(0),
- BasicTypeInfo.INT_TYPE_INFO)
-
- testHarness.open()
-
- // Time = 3
- testHarness.setProcessingTime(3)
- // key = 1
- testHarness.processElement(new StreamRecord(
- new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 0))
- // key = 2
- testHarness.processElement(new StreamRecord(
- new CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 0))
-
- // Time = 4
- testHarness.setProcessingTime(4)
- // key = 1
- testHarness.processElement(new StreamRecord(
- new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 0))
- testHarness.processElement(new StreamRecord(
- new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 0))
- // key = 2
- testHarness.processElement(new StreamRecord(
- new CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 0))
-
- // Time = 5
- testHarness.setProcessingTime(5)
- testHarness.processElement(new StreamRecord(
- new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 0))
-
- // Time = 6
- testHarness.setProcessingTime(6)
-
- // Time = 1002
- testHarness.setProcessingTime(1002)
- // key = 1
- testHarness.processElement(new StreamRecord(
- new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 0))
- testHarness.processElement(new StreamRecord(
- new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 0))
- // key = 2
- testHarness.processElement(new StreamRecord(
- new CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 0))
-
- // Time = 1003
- testHarness.setProcessingTime(1003)
- testHarness.processElement(new StreamRecord(
- new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 0))
-
- // Time = 1004
- testHarness.setProcessingTime(1004)
- testHarness.processElement(new StreamRecord(
- new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 0))
-
- // Time = 1005
- testHarness.setProcessingTime(1005)
- // key = 1
- testHarness.processElement(new StreamRecord(
- new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 0))
- testHarness.processElement(new StreamRecord(
- new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 0))
- // key = 2
- testHarness.processElement(new StreamRecord(
- new CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 0))
-
- testHarness.setProcessingTime(1006)
-
- val result = testHarness.getOutput
-
- val expectedOutput = new ConcurrentLinkedQueue[Object]()
-
- // all elements at the same proc timestamp have the same value
- expectedOutput.add(new StreamRecord(new CRow(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 4))
- expectedOutput.add(new StreamRecord(new CRow(
- Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 4))
- expectedOutput.add(new StreamRecord(new CRow(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 3L: JLong), true), 5))
- expectedOutput.add(new StreamRecord(new CRow(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 5))
- expectedOutput.add(new StreamRecord(new CRow(
- Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 5))
- expectedOutput.add(new StreamRecord(new CRow(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true), 6))
- expectedOutput.add(new StreamRecord(new CRow(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 6L: JLong), true), 1003))
- expectedOutput.add(new StreamRecord(new CRow(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true), 1003))
- expectedOutput.add(new StreamRecord(new CRow(
- Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 1003))
- expectedOutput.add(new StreamRecord(new CRow(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 1004))
- expectedOutput.add(new StreamRecord(new CRow(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 2L: JLong, 8L: JLong), true), 1005))
- expectedOutput.add(new StreamRecord(new CRow(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 4L: JLong, 10L: JLong), true), 1006))
- expectedOutput.add(new StreamRecord(new CRow(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 4L: JLong, 10L: JLong), true), 1006))
- expectedOutput.add(new StreamRecord(new CRow(
- Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 30L: JLong, 40L: JLong), true), 1006))
-
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.",
- expectedOutput, result, new RowResultSortComparator(6))
-
- testHarness.close()
-
- }
-}
-
-object BoundedProcessingOverRangeProcessFunctionTest {
-
-/**
- * Return 0 for equal CRows and non zero for different CRows
- */
-class RowResultSortComparator(indexCounter: Int) extends Comparator[Object] with Serializable {
-
- override def compare(o1: Object, o2: Object):Int = {
-
- if (o1.isInstanceOf[Watermark] || o2.isInstanceOf[Watermark]) {
- // watermark is not expected
- -1
- } else {
- val row1 = o1.asInstanceOf[StreamRecord[CRow]].getValue
- val row2 = o2.asInstanceOf[StreamRecord[CRow]].getValue
- row1.toString.compareTo(row2.toString)
- }
- }
-}
-
-/**
- * Simple test class that returns a specified field as the selector function
- */
-class TupleRowSelector(
- private val selectorField:Int) extends KeySelector[CRow, Integer] {
-
- override def getKey(value: CRow): Integer = {
- value.row.getField(selectorField).asInstanceOf[Integer]
- }
-}
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
index eb5acd5b..77798f9 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
@@ -19,15 +19,294 @@ package org.apache.flink.table.runtime.harness
import java.util.{Comparator, Queue => JQueue}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.streaming.api.operators.OneInputStreamOperator
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil}
-import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.codegen.GeneratedAggregationsFunction
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.functions.aggfunctions.{LongMaxWithRetractAggFunction, LongMinWithRetractAggFunction, IntSumWithRetractAggFunction}
+import org.apache.flink.table.runtime.aggregate.AggregateUtil
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
class HarnessTestBase {
+
+ protected val MinMaxRowType = new RowTypeInfo(Array[TypeInformation[_]](
+ INT_TYPE_INFO,
+ LONG_TYPE_INFO,
+ INT_TYPE_INFO,
+ STRING_TYPE_INFO,
+ LONG_TYPE_INFO),
+ Array("a", "b", "c", "d", "e"))
+
+ protected val SumRowType = new RowTypeInfo(Array[TypeInformation[_]](
+ LONG_TYPE_INFO,
+ INT_TYPE_INFO,
+ STRING_TYPE_INFO),
+ Array("a", "b", "c"))
+
+ protected val minMaxCRowType = new CRowTypeInfo(MinMaxRowType)
+ protected val sumCRowType = new CRowTypeInfo(SumRowType)
+
+ protected val minMaxAggregates =
+ Array(new LongMinWithRetractAggFunction,
+ new LongMaxWithRetractAggFunction).asInstanceOf[Array[AggregateFunction[_, _]]]
+
+ protected val sumAggregates =
+ Array(new IntSumWithRetractAggFunction).asInstanceOf[Array[AggregateFunction[_, _]]]
+
+ protected val minMaxAggregationStateType: RowTypeInfo =
+ AggregateUtil.createAccumulatorRowType(minMaxAggregates)
+
+ protected val sumAggregationStateType: RowTypeInfo =
+ AggregateUtil.createAccumulatorRowType(sumAggregates)
+
+ val minMaxCode: String =
+ """
+ |public class MinMaxAggregateHelper
+ | extends org.apache.flink.table.runtime.aggregate.GeneratedAggregations {
+ |
+ | transient org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction
+ | fmin = null;
+ |
+ | transient org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction
+ | fmax = null;
+ |
+ | public MinMaxAggregateHelper() throws Exception {
+ |
+ | fmin = (org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction)
+ | org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+ | .deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn" +
+ | "MuTG9uZ01pbldpdGhSZXRyYWN0QWdnRnVuY3Rpb26oIdX_DaMPxQIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL" +
+ | "nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWluV2l0aFJldHJhY3RBZ2dGdW5jdGlvbq_ZGuzxtA_S" +
+ | "AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV" +
+ | "uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS" +
+ | "5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc" +
+ | "mluZyRMb25nJOda0iCPo2ukAgAAeHA");
+ |
+ | fmax = (org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction)
+ | org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+ | .deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn" +
+ | "MuTG9uZ01heFdpdGhSZXRyYWN0QWdnRnVuY3Rpb25RmsI8azNGXwIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL" +
+ | "nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWF4V2l0aFJldHJhY3RBZ2dGdW5jdGlvbvnwowlX0_Qf" +
+ | "AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV" +
+ | "uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS" +
+ | "5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc" +
+ | "mluZyRMb25nJOda0iCPo2ukAgAAeHA");
+ | }
+ |
+ | public void setAggregationResults(
+ | org.apache.flink.types.Row accs,
+ | org.apache.flink.types.Row output) {
+ |
+ | org.apache.flink.table.functions.AggregateFunction baseClass0 =
+ | (org.apache.flink.table.functions.AggregateFunction) fmin;
+ | output.setField(5, baseClass0.getValue(
+ | (org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
+ | accs.getField(0)));
+ |
+ | org.apache.flink.table.functions.AggregateFunction baseClass1 =
+ | (org.apache.flink.table.functions.AggregateFunction) fmax;
+ | output.setField(6, baseClass1.getValue(
+ | (org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
+ | accs.getField(1)));
+ | }
+ |
+ | public void accumulate(
+ | org.apache.flink.types.Row accs,
+ | org.apache.flink.types.Row input) {
+ |
+ | fmin.accumulate(
+ | ((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
+ | accs.getField(0)),
+ | (java.lang.Long) input.getField(4));
+ |
+ | fmax.accumulate(
+ | ((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
+ | accs.getField(1)),
+ | (java.lang.Long) input.getField(4));
+ | }
+ |
+ | public void retract(
+ | org.apache.flink.types.Row accs,
+ | org.apache.flink.types.Row input) {
+ |
+ | fmin.retract(
+ | ((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
+ | accs.getField(0)),
+ | (java.lang.Long) input.getField(4));
+ |
+ | fmax.retract(
+ | ((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
+ | accs.getField(1)),
+ | (java.lang.Long) input.getField(4));
+ | }
+ |
+ | public org.apache.flink.types.Row createAccumulators() {
+ |
+ | org.apache.flink.types.Row accs = new org.apache.flink.types.Row(2);
+ |
+ | accs.setField(
+ | 0,
+ | fmin.createAccumulator());
+ |
+ | accs.setField(
+ | 1,
+ | fmax.createAccumulator());
+ |
+ | return accs;
+ | }
+ |
+ | public void setForwardedFields(
+ | org.apache.flink.types.Row input,
+ | org.apache.flink.types.Row output) {
+ |
+ | output.setField(0, input.getField(0));
+ | output.setField(1, input.getField(1));
+ | output.setField(2, input.getField(2));
+ | output.setField(3, input.getField(3));
+ | output.setField(4, input.getField(4));
+ | }
+ |
+ | public org.apache.flink.types.Row createOutputRow() {
+ | return new org.apache.flink.types.Row(7);
+ | }
+ |
+ |/******* This test does not use the following methods *******/
+ | public org.apache.flink.types.Row mergeAccumulatorsPair(
+ | org.apache.flink.types.Row a,
+ | org.apache.flink.types.Row b) {
+ | return null;
+ | }
+ |
+ | public void resetAccumulator(org.apache.flink.types.Row accs) {
+ | }
+ |
+ | public void setConstantFlags(org.apache.flink.types.Row output) {
+ | }
+ |}
+ """.stripMargin
+
+ val sumAggCode: String =
+ """
+ |public final class SumAggregationHelper
+ | extends org.apache.flink.table.runtime.aggregate.GeneratedAggregations {
+ |
+ |
+ |transient org.apache.flink.table.functions.aggfunctions.IntSumWithRetractAggFunction
+ |sum = null;
+ |private final org.apache.flink.table.runtime.aggregate.SingleElementIterable<org.apache
+ | .flink.table.functions.aggfunctions.SumWithRetractAccumulator> accIt0 =
+ | new org.apache.flink.table.runtime.aggregate.SingleElementIterable<org.apache.flink
+ | .table
+ | .functions.aggfunctions.SumWithRetractAccumulator>();
+ |
+ | public SumAggregationHelper() throws Exception {
+ |
+ |sum = (org.apache.flink.table.functions.aggfunctions.IntSumWithRetractAggFunction)
+ |org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+ |.deserialize
+ |("rO0ABXNyAEpvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuSW50U3VtV2l0a" +
+ |"FJldHJhY3RBZ2dGdW5jdGlvblkfWkeNZDeDAgAAeHIAR29yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25" +
+ |"zLmFnZ2Z1bmN0aW9ucy5TdW1XaXRoUmV0cmFjdEFnZ0Z1bmN0aW9ut2oWrOsLrs0CAAFMAAdudW1lcmljdAAUT" +
+ |"HNjYWxhL21hdGgvTnVtZXJpYzt4cgAyb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuQWdncmVnYXR" +
+ |"lRnVuY3Rpb25NxhU-0mM1_AIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVma" +
+ |"W5lZEZ1bmN0aW9uLQH3VDG4DJMCAAB4cHNyACFzY2FsYS5tYXRoLk51bWVyaWMkSW50SXNJbnRlZ3JhbCTw6XA" +
+ |"59sPAzAIAAHhw");
+ |
+ |
+ | }
+ |
+ | public final void setAggregationResults(
+ | org.apache.flink.types.Row accs,
+ | org.apache.flink.types.Row output) {
+ |
+ | org.apache.flink.table.functions.AggregateFunction baseClass0 =
+ | (org.apache.flink.table.functions.AggregateFunction)
+ | sum;
+ |
+ | output.setField(
+ | 1,
+ | baseClass0.getValue((org.apache.flink.table.functions.aggfunctions
+ | .SumWithRetractAccumulator) accs.getField(0)));
+ | }
+ |
+ | public final void accumulate(
+ | org.apache.flink.types.Row accs,
+ | org.apache.flink.types.Row input) {
+ |
+ | sum.accumulate(
+ | ((org.apache.flink.table.functions.aggfunctions.SumWithRetractAccumulator) accs
+ | .getField
+ | (0)),
+ | (java.lang.Integer) input.getField(1));
+ | }
+ |
+ |
+ | public final void retract(
+ | org.apache.flink.types.Row accs,
+ | org.apache.flink.types.Row input) {
+ | }
+ |
+ | public final org.apache.flink.types.Row createAccumulators()
+ | {
+ |
+ | org.apache.flink.types.Row accs =
+ | new org.apache.flink.types.Row(1);
+ |
+ | accs.setField(
+ | 0,
+ | sum.createAccumulator());
+ |
+ | return accs;
+ | }
+ |
+ | public final void setForwardedFields(
+ | org.apache.flink.types.Row input,
+ | org.apache.flink.types.Row output)
+ | {
+ |
+ | output.setField(
+ | 0,
+ | input.getField(0));
+ | }
+ |
+ | public final void setConstantFlags(org.apache.flink.types.Row output)
+ | {
+ |
+ | }
+ |
+ | public final org.apache.flink.types.Row createOutputRow() {
+ | return new org.apache.flink.types.Row(2);
+ | }
+ |
+ |
+ | public final org.apache.flink.types.Row mergeAccumulatorsPair(
+ | org.apache.flink.types.Row a,
+ | org.apache.flink.types.Row b)
+ | {
+ |
+ | return a;
+ |
+ | }
+ |
+ | public final void resetAccumulator(
+ | org.apache.flink.types.Row accs) {
+ | }
+ |}
+ |""".stripMargin
+
+
+ protected val minMaxFuncName = "MinMaxAggregateHelper"
+ protected val sumFuncName = "SumAggregationHelper"
+
+ protected val genMinMaxAggFunction = GeneratedAggregationsFunction(minMaxFuncName, minMaxCode)
+ protected val genSumAggFunction = GeneratedAggregationsFunction(sumFuncName, sumAggCode)
+
def createHarnessTester[IN, OUT, KEY](
operator: OneInputStreamOperator[IN, OUT],
keySelector: KeySelector[IN, KEY],
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
new file mode 100644
index 0000000..04214f9
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Integer => JInt, Long => JLong}
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.runtime.harness.HarnessTestBase._
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class NonWindowHarnessTest extends HarnessTestBase {
+
+ protected var queryConfig =
+ new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3))
+
+ @Test
+ def testProcTimeNonWindow(): Unit = {
+
+ val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+ new GroupAggProcessFunction(
+ genSumAggFunction,
+ sumAggregationStateType,
+ false,
+ queryConfig))
+
+ val testHarness =
+ createHarnessTester(
+ processFunction,
+ new TupleRowKeySelector[String](2),
+ BasicTypeInfo.STRING_TYPE_INFO)
+
+ testHarness.open()
+
+ // register cleanup timer with 3001
+ testHarness.setProcessingTime(1)
+
+ testHarness.processElement(new StreamRecord(CRow(Row.of(1L: JLong, 1: JInt, "aaa"), true), 1))
+ testHarness.processElement(new StreamRecord(CRow(Row.of(2L: JLong, 1: JInt, "bbb"), true), 1))
+ // reuse timer 3001
+ testHarness.setProcessingTime(1000)
+ testHarness.processElement(new StreamRecord(CRow(Row.of(3L: JLong, 2: JInt, "aaa"), true), 1))
+ testHarness.processElement(new StreamRecord(CRow(Row.of(4L: JLong, 3: JInt, "aaa"), true), 1))
+
+ // register cleanup timer with 4002
+ testHarness.setProcessingTime(1002)
+ testHarness.processElement(new StreamRecord(CRow(Row.of(5L: JLong, 4: JInt, "aaa"), true), 1))
+ testHarness.processElement(new StreamRecord(CRow(Row.of(6L: JLong, 2: JInt, "bbb"), true), 1))
+
+ // trigger cleanup timer and register cleanup timer with 7003
+ testHarness.setProcessingTime(4003)
+ testHarness.processElement(new StreamRecord(CRow(Row.of(7L: JLong, 5: JInt, "aaa"), true), 1))
+ testHarness.processElement(new StreamRecord(CRow(Row.of(8L: JLong, 6: JInt, "aaa"), true), 1))
+ testHarness.processElement(new StreamRecord(CRow(Row.of(9L: JLong, 7: JInt, "aaa"), true), 1))
+ testHarness.processElement(new StreamRecord(CRow(Row.of(10L: JLong, 3: JInt, "bbb"), true), 1))
+
+ val result = testHarness.getOutput
+
+ val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+ expectedOutput.add(new StreamRecord(CRow(Row.of(1L: JLong, 1: JInt), true), 1))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(2L: JLong, 1: JInt), true), 1))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(3L: JLong, 3: JInt), true), 1))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(4L: JLong, 6: JInt), true), 1))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(5L: JLong, 10: JInt), true), 1))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(6L: JLong, 3: JInt), true), 1))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(7L: JLong, 5: JInt), true), 1))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(8L: JLong, 11: JInt), true), 1))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(9L: JLong, 18: JInt), true), 1))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(10L: JLong, 3: JInt), true), 1))
+
+ verify(expectedOutput, result, new RowResultSortComparator(6))
+
+ testHarness.close()
+ }
+
+ @Test
+ def testProcTimeNonWindowWithRetract(): Unit = {
+
+ val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+ new GroupAggProcessFunction(
+ genSumAggFunction,
+ sumAggregationStateType,
+ true,
+ queryConfig))
+
+ val testHarness =
+ createHarnessTester(
+ processFunction,
+ new TupleRowKeySelector[String](2),
+ BasicTypeInfo.STRING_TYPE_INFO)
+
+ testHarness.open()
+
+ // register cleanup timer with 3001
+ testHarness.setProcessingTime(1)
+
+ testHarness.processElement(new StreamRecord(CRow(Row.of(1L: JLong, 1: JInt, "aaa"), true), 1))
+ testHarness.processElement(new StreamRecord(CRow(Row.of(2L: JLong, 1: JInt, "bbb"), true), 2))
+ testHarness.processElement(new StreamRecord(CRow(Row.of(3L: JLong, 2: JInt, "aaa"), true), 3))
+ testHarness.processElement(new StreamRecord(CRow(Row.of(4L: JLong, 3: JInt, "ccc"), true), 4))
+
+ // trigger cleanup timer and register cleanup timer with 6002
+ testHarness.setProcessingTime(3002)
+ testHarness.processElement(new StreamRecord(CRow(Row.of(5L: JLong, 4: JInt, "aaa"), true), 5))
+ testHarness.processElement(new StreamRecord(CRow(Row.of(6L: JLong, 2: JInt, "bbb"), true), 6))
+ testHarness.processElement(new StreamRecord(CRow(Row.of(7L: JLong, 5: JInt, "aaa"), true), 7))
+ testHarness.processElement(new StreamRecord(CRow(Row.of(8L: JLong, 6: JInt, "eee"), true), 8))
+ testHarness.processElement(new StreamRecord(CRow(Row.of(9L: JLong, 7: JInt, "aaa"), true), 9))
+ testHarness.processElement(new StreamRecord(CRow(Row.of(10L: JLong, 3: JInt, "bbb"), true), 10))
+
+ val result = testHarness.getOutput
+
+ val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+ expectedOutput.add(new StreamRecord(CRow(Row.of(1L: JLong, 1: JInt), true), 1))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(2L: JLong, 1: JInt), true), 2))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(3L: JLong, 1: JInt), false), 3))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(3L: JLong, 3: JInt), true), 3))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(4L: JLong, 3: JInt), true), 4))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(5L: JLong, 4: JInt), true), 5))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(6L: JLong, 2: JInt), true), 6))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(7L: JLong, 4: JInt), false), 7))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(7L: JLong, 9: JInt), true), 7))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(8L: JLong, 6: JInt), true), 8))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(9L: JLong, 9: JInt), false), 9))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(9L: JLong, 16: JInt), true), 9))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(10L: JLong, 2: JInt), false), 10))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(10L: JLong, 5: JInt), true), 10))
+
+ verify(expectedOutput, result, new RowResultSortComparator(0))
+
+ testHarness.close()
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
index 56ca85c..786a843 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -18,180 +18,34 @@
package org.apache.flink.table.runtime.harness
import java.lang.{Integer => JInt, Long => JLong}
-import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.{ConcurrentLinkedQueue}
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.streaming.api.operators.KeyedProcessOperator
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
-import org.apache.flink.table.codegen.GeneratedAggregationsFunction
-import org.apache.flink.table.functions.AggregateFunction
-import org.apache.flink.table.functions.aggfunctions.{LongMaxWithRetractAggFunction, LongMinWithRetractAggFunction}
+import org.apache.flink.table.api.StreamQueryConfig
import org.apache.flink.table.runtime.aggregate._
import org.apache.flink.table.runtime.harness.HarnessTestBase._
-import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.types.Row
import org.junit.Test
class OverWindowHarnessTest extends HarnessTestBase{
- private val rT = new RowTypeInfo(Array[TypeInformation[_]](
- INT_TYPE_INFO,
- LONG_TYPE_INFO,
- INT_TYPE_INFO,
- STRING_TYPE_INFO,
- LONG_TYPE_INFO),
- Array("a", "b", "c", "d", "e"))
-
- private val cRT = new CRowTypeInfo(rT)
-
- private val aggregates =
- Array(new LongMinWithRetractAggFunction,
- new LongMaxWithRetractAggFunction).asInstanceOf[Array[AggregateFunction[_, _]]]
- private val aggregationStateType: RowTypeInfo = AggregateUtil.createAccumulatorRowType(aggregates)
-
- val funcCode: String =
- """
- |public class BoundedOverAggregateHelper
- | extends org.apache.flink.table.runtime.aggregate.GeneratedAggregations {
- |
- | transient org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction
- | fmin = null;
- |
- | transient org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction
- | fmax = null;
- |
- | public BoundedOverAggregateHelper() throws Exception {
- |
- | fmin = (org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction)
- | org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
- | .deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn" +
- | "MuTG9uZ01pbldpdGhSZXRyYWN0QWdnRnVuY3Rpb26oIdX_DaMPxQIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL" +
- | "nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWluV2l0aFJldHJhY3RBZ2dGdW5jdGlvbq_ZGuzxtA_S" +
- | "AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV" +
- | "uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS" +
- | "5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc" +
- | "mluZyRMb25nJOda0iCPo2ukAgAAeHA");
- |
- | fmax = (org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction)
- | org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
- | .deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn" +
- | "MuTG9uZ01heFdpdGhSZXRyYWN0QWdnRnVuY3Rpb25RmsI8azNGXwIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL" +
- | "nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWF4V2l0aFJldHJhY3RBZ2dGdW5jdGlvbvnwowlX0_Qf" +
- | "AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV" +
- | "uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS" +
- | "5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc" +
- | "mluZyRMb25nJOda0iCPo2ukAgAAeHA");
- | }
- |
- | public void setAggregationResults(
- | org.apache.flink.types.Row accs,
- | org.apache.flink.types.Row output) {
- |
- | org.apache.flink.table.functions.AggregateFunction baseClass0 =
- | (org.apache.flink.table.functions.AggregateFunction) fmin;
- | output.setField(5, baseClass0.getValue(
- | (org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
- | accs.getField(0)));
- |
- | org.apache.flink.table.functions.AggregateFunction baseClass1 =
- | (org.apache.flink.table.functions.AggregateFunction) fmax;
- | output.setField(6, baseClass1.getValue(
- | (org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
- | accs.getField(1)));
- | }
- |
- | public void accumulate(
- | org.apache.flink.types.Row accs,
- | org.apache.flink.types.Row input) {
- |
- | fmin.accumulate(
- | ((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
- | accs.getField(0)),
- | (java.lang.Long) input.getField(4));
- |
- | fmax.accumulate(
- | ((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
- | accs.getField(1)),
- | (java.lang.Long) input.getField(4));
- | }
- |
- | public void retract(
- | org.apache.flink.types.Row accs,
- | org.apache.flink.types.Row input) {
- |
- | fmin.retract(
- | ((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
- | accs.getField(0)),
- | (java.lang.Long) input.getField(4));
- |
- | fmax.retract(
- | ((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
- | accs.getField(1)),
- | (java.lang.Long) input.getField(4));
- | }
- |
- | public org.apache.flink.types.Row createAccumulators() {
- |
- | org.apache.flink.types.Row accs = new org.apache.flink.types.Row(2);
- |
- | accs.setField(
- | 0,
- | fmin.createAccumulator());
- |
- | accs.setField(
- | 1,
- | fmax.createAccumulator());
- |
- | return accs;
- | }
- |
- | public void setForwardedFields(
- | org.apache.flink.types.Row input,
- | org.apache.flink.types.Row output) {
- |
- | output.setField(0, input.getField(0));
- | output.setField(1, input.getField(1));
- | output.setField(2, input.getField(2));
- | output.setField(3, input.getField(3));
- | output.setField(4, input.getField(4));
- | }
- |
- | public org.apache.flink.types.Row createOutputRow() {
- | return new org.apache.flink.types.Row(7);
- | }
- |
- |/******* This test does not use the following methods *******/
- | public org.apache.flink.types.Row mergeAccumulatorsPair(
- | org.apache.flink.types.Row a,
- | org.apache.flink.types.Row b) {
- | return null;
- | }
- |
- | public void resetAccumulator(org.apache.flink.types.Row accs) {
- | }
- |
- | public void setConstantFlags(org.apache.flink.types.Row output) {
- | }
- |}
- """.stripMargin
-
-
- private val funcName = "BoundedOverAggregateHelper"
-
- private val genAggFunction = GeneratedAggregationsFunction(funcName, funcCode)
-
+ protected var queryConfig =
+ new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3))
@Test
def testProcTimeBoundedRowsOver(): Unit = {
val processFunction = new KeyedProcessOperator[String, CRow, CRow](
new ProcTimeBoundedRowsOver(
- genAggFunction,
+ genMinMaxAggFunction,
2,
- aggregationStateType,
- cRT))
+ minMaxAggregationStateType,
+ minMaxCRowType,
+ queryConfig))
val testHarness =
createHarnessTester(processFunction,new TupleRowKeySelector[Integer](0),BasicTypeInfo
@@ -199,6 +53,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
testHarness.open()
+ // register cleanup timer with 3001
testHarness.setProcessingTime(1)
testHarness.processElement(new StreamRecord(
@@ -209,6 +64,9 @@ class OverWindowHarnessTest extends HarnessTestBase{
CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 1))
testHarness.processElement(new StreamRecord(
CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 1))
+
+ // register cleanup timer with 4100
+ testHarness.setProcessingTime(1100)
testHarness.processElement(new StreamRecord(
CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 1))
testHarness.processElement(new StreamRecord(
@@ -220,15 +78,19 @@ class OverWindowHarnessTest extends HarnessTestBase{
testHarness.processElement(new StreamRecord(
CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 1))
- testHarness.setProcessingTime(2)
+ // register cleanup timer with 6001
+ testHarness.setProcessingTime(3001)
testHarness.processElement(new StreamRecord(
CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 2))
testHarness.processElement(new StreamRecord(
CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 2))
testHarness.processElement(new StreamRecord(
CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 2))
+
+ // trigger cleanup timer and register cleanup timer with 9002
+ testHarness.setProcessingTime(6002)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 2))
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 2))
testHarness.processElement(new StreamRecord(
CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 2))
@@ -274,10 +136,10 @@ class OverWindowHarnessTest extends HarnessTestBase{
Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 8L: JLong, 9L: JLong), true), 2))
expectedOutput.add(new StreamRecord(
CRow(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 9L: JLong, 10L: JLong), true), 2))
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 10L: JLong, 10L: JLong), true), 2))
expectedOutput.add(new StreamRecord(
CRow(
- Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 30L: JLong, 40L: JLong), true), 2))
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true), 2))
verify(expectedOutput, result, new RowResultSortComparator(6))
@@ -292,10 +154,11 @@ class OverWindowHarnessTest extends HarnessTestBase{
val processFunction = new KeyedProcessOperator[String, CRow, CRow](
new ProcTimeBoundedRangeOver(
- genAggFunction,
- 1000,
- aggregationStateType,
- cRT))
+ genMinMaxAggFunction,
+ 4000,
+ minMaxAggregationStateType,
+ minMaxCRowType,
+ queryConfig))
val testHarness =
createHarnessTester(
@@ -305,6 +168,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
testHarness.open()
+ // register cleanup timer with 3003
testHarness.setProcessingTime(3)
testHarness.processElement(new StreamRecord(
CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 0))
@@ -314,6 +178,9 @@ class OverWindowHarnessTest extends HarnessTestBase{
testHarness.setProcessingTime(4)
testHarness.processElement(new StreamRecord(
CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 0))
+
+ // trigger cleanup timer and register cleanup timer with 6003
+ testHarness.setProcessingTime(3003)
testHarness.processElement(new StreamRecord(
CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 0))
testHarness.processElement(new StreamRecord(
@@ -323,9 +190,10 @@ class OverWindowHarnessTest extends HarnessTestBase{
testHarness.processElement(new StreamRecord(
CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 0))
- testHarness.setProcessingTime(6)
+ // register cleanup timer with 9002
+ testHarness.setProcessingTime(6002)
- testHarness.setProcessingTime(1002)
+ testHarness.setProcessingTime(7002)
testHarness.processElement(new StreamRecord(
CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 0))
testHarness.processElement(new StreamRecord(
@@ -333,15 +201,15 @@ class OverWindowHarnessTest extends HarnessTestBase{
testHarness.processElement(new StreamRecord(
CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 0))
- testHarness.setProcessingTime(1003)
+ // register cleanup timer with 14002
+ testHarness.setProcessingTime(11002)
testHarness.processElement(new StreamRecord(
CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 0))
- testHarness.setProcessingTime(1004)
+ testHarness.setProcessingTime(11004)
testHarness.processElement(new StreamRecord(
CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 0))
- testHarness.setProcessingTime(1005)
testHarness.processElement(new StreamRecord(
CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 0))
testHarness.processElement(new StreamRecord(
@@ -349,7 +217,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
testHarness.processElement(new StreamRecord(
CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 0))
- testHarness.setProcessingTime(1006)
+ testHarness.setProcessingTime(11006)
val result = testHarness.getOutput
@@ -364,40 +232,40 @@ class OverWindowHarnessTest extends HarnessTestBase{
Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 4))
expectedOutput.add(new StreamRecord(
CRow(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 3L: JLong), true), 5))
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 5))
expectedOutput.add(new StreamRecord(
CRow(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 5))
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 3L: JLong, 4L: JLong), true), 3004))
expectedOutput.add(new StreamRecord(
- CRow(
- Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 5))
+ CRow(Row.of(
+ 2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 20L: JLong, 20L: JLong), true), 3004))
expectedOutput.add(new StreamRecord(
CRow(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true), 6))
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 4L: JLong, 4L: JLong), true), 6))
expectedOutput.add(new StreamRecord(
CRow(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 6L: JLong), true), 1003))
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 5L: JLong, 6L: JLong), true), 7003))
expectedOutput.add(new StreamRecord(
CRow(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true), 1003))
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 5L: JLong, 6L: JLong), true), 7003))
expectedOutput.add(new StreamRecord(
CRow(
- Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 1003))
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 30L: JLong, 30L: JLong), true), 7003))
expectedOutput.add(new StreamRecord(
CRow(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 1004))
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 7L: JLong, 7L: JLong), true), 11003))
expectedOutput.add(new StreamRecord(
- CRow(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 2L: JLong, 8L: JLong), true), 1005))
+ CRow(Row.of(
+ 1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 7L: JLong, 10L: JLong), true), 11005))
expectedOutput.add(new StreamRecord(
- CRow(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 4L: JLong, 10L: JLong), true), 1006))
+ CRow(Row.of(
+ 1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 7L: JLong, 10L: JLong), true), 11005))
expectedOutput.add(new StreamRecord(
CRow(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 4L: JLong, 10L: JLong), true), 1006))
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 7L: JLong, 10L: JLong), true), 11005))
expectedOutput.add(new StreamRecord(
CRow(
- Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 30L: JLong, 40L: JLong), true), 1006))
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true), 11005))
verify(expectedOutput, result, new RowResultSortComparator(6))
@@ -409,8 +277,9 @@ class OverWindowHarnessTest extends HarnessTestBase{
val processFunction = new KeyedProcessOperator[String, CRow, CRow](
new ProcTimeUnboundedPartitionedOver(
- genAggFunction,
- aggregationStateType))
+ genMinMaxAggFunction,
+ minMaxAggregationStateType,
+ queryConfig))
val testHarness =
createHarnessTester(
@@ -420,6 +289,9 @@ class OverWindowHarnessTest extends HarnessTestBase{
testHarness.open()
+ // register cleanup timer with 4003
+ testHarness.setProcessingTime(1003)
+
testHarness.processElement(new StreamRecord(
CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 0))
testHarness.processElement(new StreamRecord(
@@ -438,18 +310,19 @@ class OverWindowHarnessTest extends HarnessTestBase{
CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 0))
testHarness.processElement(new StreamRecord(
CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 0))
-
- testHarness.setProcessingTime(1003)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 1003))
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 0))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 1003))
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 0))
+
+ // trigger cleanup timer and register cleanup timer with 8003
+ testHarness.setProcessingTime(5003)
testHarness.processElement(new StreamRecord(
- CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 1003))
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 5003))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 1003))
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 5003))
testHarness.processElement(new StreamRecord(
- CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 1003))
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 5003))
val result = testHarness.getOutput
@@ -484,19 +357,19 @@ class OverWindowHarnessTest extends HarnessTestBase{
Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 0))
expectedOutput.add(new StreamRecord(
CRow(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 1003))
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 0))
expectedOutput.add(new StreamRecord(
CRow(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true), 1003))
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true), 0))
expectedOutput.add(new StreamRecord(
CRow(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 1L: JLong, 9L: JLong), true), 1003))
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 9L: JLong, 9L: JLong), true), 5003))
expectedOutput.add(new StreamRecord(
CRow(
- Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 1L: JLong, 10L: JLong), true), 1003))
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 9L: JLong, 10L: JLong), true), 5003))
expectedOutput.add(new StreamRecord(
CRow(
- Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), true), 1003))
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true), 5003))
verify(expectedOutput, result, new RowResultSortComparator(6))
testHarness.close()
@@ -510,10 +383,11 @@ class OverWindowHarnessTest extends HarnessTestBase{
val processFunction = new KeyedProcessOperator[String, CRow, CRow](
new RowTimeBoundedRangeOver(
- genAggFunction,
- aggregationStateType,
- cRT,
- 4000))
+ genMinMaxAggFunction,
+ minMaxAggregationStateType,
+ minMaxCRowType,
+ 4000,
+ new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2))))
val testHarness =
createHarnessTester(
@@ -573,6 +447,40 @@ class OverWindowHarnessTest extends HarnessTestBase{
testHarness.processWatermark(19000)
+ // test cleanup
+ testHarness.setProcessingTime(1000)
+ testHarness.processWatermark(20000)
+
+ // check that state is removed after max retention time
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 20001)) // clean-up 3000
+ testHarness.setProcessingTime(2500)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong), true), 20002)) // clean-up 4500
+ testHarness.processWatermark(20010) // compute output
+
+ assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
+ testHarness.setProcessingTime(4499)
+ assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
+ testHarness.setProcessingTime(4500)
+ assert(testHarness.numKeyedStateEntries() == 0) // check that all state is gone
+
+ // check that state is only removed if all data was processed
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(3: JInt, 0L: JLong, 0: JInt, "ccc", 3L: JLong), true), 20011)) // clean-up 6500
+
+ assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
+ testHarness.setProcessingTime(6500) // clean-up attempt but rescheduled to 8500
+ assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
+
+ testHarness.processWatermark(20020) // schedule emission
+
+ assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
+ testHarness.setProcessingTime(8499) // clean-up
+ assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
+ testHarness.setProcessingTime(8500) // clean-up
+ assert(testHarness.numKeyedStateEntries() == 0) // check that all state is gone
+
val result = testHarness.getOutput
val expectedOutput = new ConcurrentLinkedQueue[Object]()
@@ -621,6 +529,16 @@ class OverWindowHarnessTest extends HarnessTestBase{
CRow(
Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true), 12001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true), 20001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true), 20002))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(3: JInt, 0L: JLong, 0: JInt, "ccc", 3L: JLong, 3L: JLong, 3L: JLong), true), 20011))
+
verify(expectedOutput, result, new RowResultSortComparator(6))
testHarness.close()
}
@@ -630,10 +548,11 @@ class OverWindowHarnessTest extends HarnessTestBase{
val processFunction = new KeyedProcessOperator[String, CRow, CRow](
new RowTimeBoundedRowsOver(
- genAggFunction,
- aggregationStateType,
- cRT,
- 3))
+ genMinMaxAggFunction,
+ minMaxAggregationStateType,
+ minMaxCRowType,
+ 3,
+ new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2))))
val testHarness =
createHarnessTester(
@@ -689,6 +608,41 @@ class OverWindowHarnessTest extends HarnessTestBase{
testHarness.processWatermark(19000)
+ // test cleanup
+ testHarness.setProcessingTime(1000)
+ testHarness.processWatermark(20000)
+
+ // check that state is removed after max retention time
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 20001)) // clean-up 3000
+ testHarness.setProcessingTime(2500)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong), true), 20002)) // clean-up 4500
+ testHarness.processWatermark(20010) // compute output
+
+ assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
+ testHarness.setProcessingTime(4499)
+ assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
+ testHarness.setProcessingTime(4500)
+ assert(testHarness.numKeyedStateEntries() == 0) // check that all state is gone
+
+ // check that state is only removed if all data was processed
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(3: JInt, 0L: JLong, 0: JInt, "ccc", 3L: JLong), true), 20011)) // clean-up 6500
+
+ assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
+ testHarness.setProcessingTime(6500) // clean-up attempt but rescheduled to 8500
+ assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
+
+ testHarness.processWatermark(20020) // schedule emission
+
+ assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
+ testHarness.setProcessingTime(8499) // clean-up
+ assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
+ testHarness.setProcessingTime(8500) // clean-up
+ assert(testHarness.numKeyedStateEntries() == 0) // check that all state is gone
+
+
val result = testHarness.getOutput
val expectedOutput = new ConcurrentLinkedQueue[Object]()
@@ -736,6 +690,16 @@ class OverWindowHarnessTest extends HarnessTestBase{
CRow(
Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 20L: JLong, 40L: JLong), true), 12001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true), 20001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true), 20002))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(3: JInt, 0L: JLong, 0: JInt, "ccc", 3L: JLong, 3L: JLong, 3L: JLong), true), 20011))
+
verify(expectedOutput, result, new RowResultSortComparator(6))
testHarness.close()
}
@@ -748,9 +712,10 @@ class OverWindowHarnessTest extends HarnessTestBase{
val processFunction = new KeyedProcessOperator[String, CRow, CRow](
new RowTimeUnboundedRangeOver(
- genAggFunction,
- aggregationStateType,
- cRT))
+ genMinMaxAggFunction,
+ minMaxAggregationStateType,
+ minMaxCRowType,
+ new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2))))
val testHarness =
createHarnessTester(
@@ -760,6 +725,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
testHarness.open()
+ testHarness.setProcessingTime(1000)
testHarness.processWatermark(800)
testHarness.processElement(new StreamRecord(
CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 801))
@@ -806,6 +772,30 @@ class OverWindowHarnessTest extends HarnessTestBase{
testHarness.processWatermark(19000)
+ // test cleanup
+ assert(testHarness.numKeyedStateEntries() > 0)
+ testHarness.setProcessingTime(2999) // clean up timer is 3000, so nothing should happen
+ assert(testHarness.numKeyedStateEntries() > 0)
+ testHarness.setProcessingTime(3000) // clean up is triggered
+ assert(testHarness.numKeyedStateEntries() == 0)
+
+ testHarness.processWatermark(20000)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 20001)) // clean-up 5000
+ testHarness.setProcessingTime(2500)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong), true), 20002)) // clean-up 5000
+
+ assert(testHarness.numKeyedStateEntries() > 0)
+ testHarness.setProcessingTime(5000) // does not clean up, because data left. New timer 7000
+ testHarness.processWatermark(20010) // compute output
+
+ assert(testHarness.numKeyedStateEntries() > 0)
+ testHarness.setProcessingTime(6999) // clean up timer is 3000, so nothing should happen
+ assert(testHarness.numKeyedStateEntries() > 0)
+ testHarness.setProcessingTime(7000) // clean up is triggered
+ assert(testHarness.numKeyedStateEntries() == 0)
+
val result = testHarness.getOutput
val expectedOutput = new ConcurrentLinkedQueue[Object]()
@@ -854,6 +844,13 @@ class OverWindowHarnessTest extends HarnessTestBase{
CRow(
Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), true), 12001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true), 20001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true), 20002))
+
verify(expectedOutput, result, new RowResultSortComparator(6))
testHarness.close()
}
@@ -863,9 +860,10 @@ class OverWindowHarnessTest extends HarnessTestBase{
val processFunction = new KeyedProcessOperator[String, CRow, CRow](
new RowTimeUnboundedRowsOver(
- genAggFunction,
- aggregationStateType,
- cRT))
+ genMinMaxAggFunction,
+ minMaxAggregationStateType,
+ minMaxCRowType,
+ new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2))))
val testHarness =
createHarnessTester(
@@ -875,6 +873,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
testHarness.open()
+ testHarness.setProcessingTime(1000)
testHarness.processWatermark(800)
testHarness.processElement(new StreamRecord(
CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 801))
@@ -921,6 +920,30 @@ class OverWindowHarnessTest extends HarnessTestBase{
testHarness.processWatermark(19000)
+ // test cleanup
+ assert(testHarness.numKeyedStateEntries() > 0)
+ testHarness.setProcessingTime(2999) // clean up timer is 3000, so nothing should happen
+ assert(testHarness.numKeyedStateEntries() > 0)
+ testHarness.setProcessingTime(3000) // clean up is triggered
+ assert(testHarness.numKeyedStateEntries() == 0)
+
+ testHarness.processWatermark(20000)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 20001)) // clean-up 5000
+ testHarness.setProcessingTime(2500)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong), true), 20002)) // clean-up 5000
+
+ assert(testHarness.numKeyedStateEntries() > 0)
+ testHarness.setProcessingTime(5000) // does not clean up, because data left. New timer 7000
+ testHarness.processWatermark(20010) // compute output
+
+ assert(testHarness.numKeyedStateEntries() > 0)
+ testHarness.setProcessingTime(6999) // clean up timer is 3000, so nothing should happen
+ assert(testHarness.numKeyedStateEntries() > 0)
+ testHarness.setProcessingTime(7000) // clean up is triggered
+ assert(testHarness.numKeyedStateEntries() == 0)
+
val result = testHarness.getOutput
val expectedOutput = new ConcurrentLinkedQueue[Object]()
@@ -968,6 +991,13 @@ class OverWindowHarnessTest extends HarnessTestBase{
CRow(
Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), true), 12001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true), 20001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true), 20002))
+
verify(expectedOutput, result, new RowResultSortComparator(6))
testHarness.close()
}
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index 3d79e22..c4e2433 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -28,7 +28,7 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) {
override private[flink] def writeToSink[T](
table: Table,
sink: TableSink[T],
- qConfig: QueryConfig): Unit = ???
+ queryConfig: QueryConfig): Unit = ???
override protected def checkValidTableName(name: String): Unit = ???