You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:51 UTC
[44/50] [abbrv] flink git commit: [FLINK-6216] [table] Add
non-windowed GroupBy aggregation for streams.
[FLINK-6216] [table] Add non-windowed GroupBy aggregation for streams.
This closes #3646.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24fa1a1c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24fa1a1c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24fa1a1c
Branch: refs/heads/table-retraction
Commit: 24fa1a1c2516726a5ecd446bc79b9fb6664bec38
Parents: 6181302
Author: shaoxuan-wang <ws...@gmail.com>
Authored: Thu Mar 30 03:57:58 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed May 3 11:27:12 2017 +0200
----------------------------------------------------------------------
.../flink/table/plan/logical/operators.scala | 3 -
.../nodes/datastream/DataStreamAggregate.scala | 272 -------------------
.../datastream/DataStreamGroupAggregate.scala | 140 ++++++++++
.../DataStreamGroupWindowAggregate.scala | 272 +++++++++++++++++++
.../flink/table/plan/rules/FlinkRuleSets.scala | 3 +-
.../datastream/DataStreamAggregateRule.scala | 76 ------
.../DataStreamGroupAggregateRule.scala | 79 ++++++
.../DataStreamGroupWindowAggregateRule.scala | 76 ++++++
.../table/runtime/aggregate/AggregateUtil.scala | 63 ++++-
.../aggregate/GroupAggProcessFunction.scala | 90 ++++++
.../aggregate/ProcTimeBoundedRangeOver.scala | 2 +-
.../scala/batch/table/FieldProjectionTest.scala | 4 +-
.../table/api/scala/stream/sql/SqlITCase.scala | 21 ++
.../scala/stream/sql/WindowAggregateTest.scala | 46 ++--
.../scala/stream/table/AggregationsITCase.scala | 180 ------------
.../stream/table/GroupAggregationsITCase.scala | 132 +++++++++
.../stream/table/GroupAggregationsTest.scala | 214 +++++++++++++++
.../table/GroupWindowAggregationsITCase.scala | 180 ++++++++++++
.../scala/stream/table/GroupWindowTest.scala | 56 ++--
.../scala/stream/table/UnsupportedOpsTest.scala | 7 -
20 files changed, 1318 insertions(+), 598 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/24fa1a1c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index f1bb644..66b26ed 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -221,9 +221,6 @@ case class Aggregate(
}
override def validate(tableEnv: TableEnvironment): LogicalNode = {
- if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
- failValidation(s"Aggregate on stream tables is currently not supported.")
- }
val resolvedAggregate = super.validate(tableEnv).asInstanceOf[Aggregate]
val groupingExprs = resolvedAggregate.groupingExpressions
http://git-wip-us.apache.org/repos/asf/flink/blob/24fa1a1c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
deleted file mode 100644
index 187773d..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.nodes.datastream
-
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.AggregateCall
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
-import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
-import org.apache.flink.table.api.StreamTableEnvironment
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.expressions._
-import org.apache.flink.table.plan.logical._
-import org.apache.flink.table.plan.nodes.CommonAggregate
-import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
-import org.apache.flink.table.runtime.aggregate.AggregateUtil._
-import org.apache.flink.table.runtime.aggregate._
-import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
-import org.apache.flink.types.Row
-
-class DataStreamAggregate(
- window: LogicalWindow,
- namedProperties: Seq[NamedWindowProperty],
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- inputNode: RelNode,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- rowRelDataType: RelDataType,
- inputType: RelDataType,
- grouping: Array[Int])
- extends SingleRel(cluster, traitSet, inputNode) with CommonAggregate with DataStreamRel {
-
- override def deriveRowType(): RelDataType = rowRelDataType
-
- override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
- new DataStreamAggregate(
- window,
- namedProperties,
- cluster,
- traitSet,
- inputs.get(0),
- namedAggregates,
- getRowType,
- inputType,
- grouping)
- }
-
- override def toString: String = {
- s"Aggregate(${
- if (!grouping.isEmpty) {
- s"groupBy: (${groupingToString(inputType, grouping)}), "
- } else {
- ""
- }
- }window: ($window), " +
- s"select: (${
- aggregationToString(
- inputType,
- grouping,
- getRowType,
- namedAggregates,
- namedProperties)
- }))"
- }
-
- override def explainTerms(pw: RelWriter): RelWriter = {
- super.explainTerms(pw)
- .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty)
- .item("window", window)
- .item(
- "select", aggregationToString(
- inputType,
- grouping,
- getRowType,
- namedAggregates,
- namedProperties))
- }
-
- override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
-
- val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
-
- val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
-
- val aggString = aggregationToString(
- inputType,
- grouping,
- getRowType,
- namedAggregates,
- namedProperties)
-
- val keyedAggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " +
- s"window: ($window), " +
- s"select: ($aggString)"
- val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
-
- val generator = new CodeGenerator(
- tableEnv.getConfig,
- false,
- inputDS.getType)
-
- // grouped / keyed aggregation
- if (grouping.length > 0) {
- val windowFunction = AggregateUtil.createAggregationGroupWindowFunction(
- window,
- grouping.length,
- namedAggregates.size,
- rowRelDataType.getFieldCount,
- namedProperties)
-
- val keyedStream = inputDS.keyBy(grouping: _*)
- val windowedStream =
- createKeyedWindowedStream(window, keyedStream)
- .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
- val (aggFunction, accumulatorRowType, aggResultRowType) =
- AggregateUtil.createDataStreamAggregateFunction(
- generator,
- namedAggregates,
- inputType,
- rowRelDataType)
-
- windowedStream
- .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, rowTypeInfo)
- .name(keyedAggOpName)
- }
- // global / non-keyed aggregation
- else {
- val windowFunction = AggregateUtil.createAggregationAllWindowFunction(
- window,
- rowRelDataType.getFieldCount,
- namedProperties)
-
- val windowedStream =
- createNonKeyedWindowedStream(window, inputDS)
- .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
-
- val (aggFunction, accumulatorRowType, aggResultRowType) =
- AggregateUtil.createDataStreamAggregateFunction(
- generator,
- namedAggregates,
- inputType,
- rowRelDataType)
-
- windowedStream
- .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, rowTypeInfo)
- .name(nonKeyedAggOpName)
- }
- }
-}
-
-object DataStreamAggregate {
-
-
- private def createKeyedWindowedStream(groupWindow: LogicalWindow, stream: KeyedStream[Row, Tuple])
- : WindowedStream[Row, Tuple, _ <: DataStreamWindow] = groupWindow match {
-
- case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
- stream.window(TumblingProcessingTimeWindows.of(asTime(size)))
-
- case ProcessingTimeTumblingGroupWindow(_, size) =>
- stream.countWindow(asCount(size))
-
- case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
- stream.window(TumblingEventTimeWindows.of(asTime(size)))
-
- case EventTimeTumblingGroupWindow(_, _, size) =>
- // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
- // before applying the windowing logic. Otherwise, this would be the same as a
- // ProcessingTimeTumblingGroupWindow
- throw new UnsupportedOperationException(
- "Event-time grouping windows on row intervals are currently not supported.")
-
- case ProcessingTimeSlidingGroupWindow(_, size, slide) if isTimeInterval(size.resultType) =>
- stream.window(SlidingProcessingTimeWindows.of(asTime(size), asTime(slide)))
-
- case ProcessingTimeSlidingGroupWindow(_, size, slide) =>
- stream.countWindow(asCount(size), asCount(slide))
-
- case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
- stream.window(SlidingEventTimeWindows.of(asTime(size), asTime(slide)))
-
- case EventTimeSlidingGroupWindow(_, _, size, slide) =>
- // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
- // before applying the windowing logic. Otherwise, this would be the same as a
- // ProcessingTimeTumblingGroupWindow
- throw new UnsupportedOperationException(
- "Event-time grouping windows on row intervals are currently not supported.")
-
- case ProcessingTimeSessionGroupWindow(_, gap: Expression) =>
- stream.window(ProcessingTimeSessionWindows.withGap(asTime(gap)))
-
- case EventTimeSessionGroupWindow(_, _, gap) =>
- stream.window(EventTimeSessionWindows.withGap(asTime(gap)))
- }
-
- private def createNonKeyedWindowedStream(groupWindow: LogicalWindow, stream: DataStream[Row])
- : AllWindowedStream[Row, _ <: DataStreamWindow] = groupWindow match {
-
- case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
- stream.windowAll(TumblingProcessingTimeWindows.of(asTime(size)))
-
- case ProcessingTimeTumblingGroupWindow(_, size) =>
- stream.countWindowAll(asCount(size))
-
- case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
- stream.windowAll(TumblingEventTimeWindows.of(asTime(size)))
-
- case EventTimeTumblingGroupWindow(_, _, size) =>
- // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
- // before applying the windowing logic. Otherwise, this would be the same as a
- // ProcessingTimeTumblingGroupWindow
- throw new UnsupportedOperationException(
- "Event-time grouping windows on row intervals are currently not supported.")
-
- case ProcessingTimeSlidingGroupWindow(_, size, slide) if isTimeInterval(size.resultType) =>
- stream.windowAll(SlidingProcessingTimeWindows.of(asTime(size), asTime(slide)))
-
- case ProcessingTimeSlidingGroupWindow(_, size, slide) =>
- stream.countWindowAll(asCount(size), asCount(slide))
-
- case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
- stream.windowAll(SlidingEventTimeWindows.of(asTime(size), asTime(slide)))
-
- case EventTimeSlidingGroupWindow(_, _, size, slide) =>
- // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
- // before applying the windowing logic. Otherwise, this would be the same as a
- // ProcessingTimeTumblingGroupWindow
- throw new UnsupportedOperationException(
- "Event-time grouping windows on row intervals are currently not supported.")
-
- case ProcessingTimeSessionGroupWindow(_, gap) =>
- stream.windowAll(ProcessingTimeSessionWindows.withGap(asTime(gap)))
-
- case EventTimeSessionGroupWindow(_, _, gap) =>
- stream.windowAll(EventTimeSessionWindows.withGap(asTime(gap)))
- }
-
- def asTime(expr: Expression): Time = expr match {
- case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) => Time.milliseconds(value)
- case _ => throw new IllegalArgumentException()
- }
-
- def asCount(expr: Expression): Long = expr match {
- case Literal(value: Long, RowIntervalTypeInfo.INTERVAL_ROWS) => value
- case _ => throw new IllegalArgumentException()
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/24fa1a1c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
new file mode 100644
index 0000000..955d702
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.types.Row
+import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
+
+/**
+ *
+ * Flink RelNode for data stream unbounded group aggregate
+ *
+ * @param cluster Cluster of the RelNode, represent for an environment of related
+ * relational expressions during the optimization of a query.
+ * @param traitSet Trait set of the RelNode
+ * @param inputNode The input RelNode of aggregation
+ * @param namedAggregates List of calls to aggregate functions and their output field names
+ * @param rowRelDataType The type of the rows of the RelNode
+ * @param inputType The type of the rows of aggregation input RelNode
+ * @param groupings The position (in the input Row) of the grouping keys
+ */
+class DataStreamGroupAggregate(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ inputNode: RelNode,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ rowRelDataType: RelDataType,
+ inputType: RelDataType,
+ groupings: Array[Int])
+ extends SingleRel(cluster, traitSet, inputNode)
+ with CommonAggregate
+ with DataStreamRel {
+
+ override def deriveRowType() = rowRelDataType
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataStreamGroupAggregate(
+ cluster,
+ traitSet,
+ inputs.get(0),
+ namedAggregates,
+ getRowType,
+ inputType,
+ groupings)
+ }
+
+ override def toString: String = {
+ s"Aggregate(${
+ if (!groupings.isEmpty) {
+ s"groupBy: (${groupingToString(inputType, groupings)}), "
+ } else {
+ ""
+ }
+ }select:(${aggregationToString(inputType, groupings, getRowType, namedAggregates, Nil)}))"
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ super.explainTerms(pw)
+ .itemIf("groupBy", groupingToString(inputType, groupings), !groupings.isEmpty)
+ .item("select", aggregationToString(inputType, groupings, getRowType, namedAggregates, Nil))
+ }
+
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
+
+ val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+
+ val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+
+ val generator = new CodeGenerator(
+ tableEnv.getConfig,
+ false,
+ inputDS.getType)
+
+ val aggString = aggregationToString(
+ inputType,
+ groupings,
+ getRowType,
+ namedAggregates,
+ Nil)
+
+ val keyedAggOpName = s"groupBy: (${groupingToString(inputType, groupings)}), " +
+ s"select: ($aggString)"
+ val nonKeyedAggOpName = s"select: ($aggString)"
+
+ val processFunction = AggregateUtil.createGroupAggregateFunction(
+ generator,
+ namedAggregates,
+ inputType,
+ groupings)
+
+ val result: DataStream[Row] =
+ // grouped / keyed aggregation
+ if (groupings.nonEmpty) {
+ inputDS
+ .keyBy(groupings: _*)
+ .process(processFunction)
+ .returns(rowTypeInfo)
+ .name(keyedAggOpName)
+ .asInstanceOf[DataStream[Row]]
+ }
+ // global / non-keyed aggregation
+ else {
+ inputDS
+ .keyBy(new NullByteKeySelector[Row])
+ .process(processFunction)
+ .setParallelism(1)
+ .setMaxParallelism(1)
+ .returns(rowTypeInfo)
+ .name(nonKeyedAggOpName)
+ .asInstanceOf[DataStream[Row]]
+ }
+ result
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/24fa1a1c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
new file mode 100644
index 0000000..752dbbe
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
+import org.apache.flink.types.Row
+
+class DataStreamGroupWindowAggregate(
+ window: LogicalWindow,
+ namedProperties: Seq[NamedWindowProperty],
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ inputNode: RelNode,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ rowRelDataType: RelDataType,
+ inputType: RelDataType,
+ grouping: Array[Int])
+ extends SingleRel(cluster, traitSet, inputNode) with CommonAggregate with DataStreamRel {
+
+ override def deriveRowType(): RelDataType = rowRelDataType
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataStreamGroupWindowAggregate(
+ window,
+ namedProperties,
+ cluster,
+ traitSet,
+ inputs.get(0),
+ namedAggregates,
+ getRowType,
+ inputType,
+ grouping)
+ }
+
+ override def toString: String = {
+ s"Aggregate(${
+ if (!grouping.isEmpty) {
+ s"groupBy: (${groupingToString(inputType, grouping)}), "
+ } else {
+ ""
+ }
+ }window: ($window), " +
+ s"select: (${
+ aggregationToString(
+ inputType,
+ grouping,
+ getRowType,
+ namedAggregates,
+ namedProperties)
+ }))"
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ super.explainTerms(pw)
+ .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty)
+ .item("window", window)
+ .item(
+ "select", aggregationToString(
+ inputType,
+ grouping,
+ getRowType,
+ namedAggregates,
+ namedProperties))
+ }
+
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
+
+ val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+
+ val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+
+ val aggString = aggregationToString(
+ inputType,
+ grouping,
+ getRowType,
+ namedAggregates,
+ namedProperties)
+
+ val keyedAggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " +
+ s"window: ($window), " +
+ s"select: ($aggString)"
+ val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
+
+ val generator = new CodeGenerator(
+ tableEnv.getConfig,
+ false,
+ inputDS.getType)
+
+ // grouped / keyed aggregation
+ if (grouping.length > 0) {
+ val windowFunction = AggregateUtil.createAggregationGroupWindowFunction(
+ window,
+ grouping.length,
+ namedAggregates.size,
+ rowRelDataType.getFieldCount,
+ namedProperties)
+
+ val keyedStream = inputDS.keyBy(grouping: _*)
+ val windowedStream =
+ createKeyedWindowedStream(window, keyedStream)
+ .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
+
+ val (aggFunction, accumulatorRowType, aggResultRowType) =
+ AggregateUtil.createDataStreamAggregateFunction(
+ generator,
+ namedAggregates,
+ inputType,
+ rowRelDataType)
+
+ windowedStream
+ .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, rowTypeInfo)
+ .name(keyedAggOpName)
+ }
+ // global / non-keyed aggregation
+ else {
+ val windowFunction = AggregateUtil.createAggregationAllWindowFunction(
+ window,
+ rowRelDataType.getFieldCount,
+ namedProperties)
+
+ val windowedStream =
+ createNonKeyedWindowedStream(window, inputDS)
+ .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
+
+ val (aggFunction, accumulatorRowType, aggResultRowType) =
+ AggregateUtil.createDataStreamAggregateFunction(
+ generator,
+ namedAggregates,
+ inputType,
+ rowRelDataType)
+
+ windowedStream
+ .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, rowTypeInfo)
+ .name(nonKeyedAggOpName)
+ }
+ }
+}
+
+object DataStreamGroupWindowAggregate {
+
+
+ private def createKeyedWindowedStream(groupWindow: LogicalWindow, stream: KeyedStream[Row, Tuple])
+ : WindowedStream[Row, Tuple, _ <: DataStreamWindow] = groupWindow match {
+
+ case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
+ stream.window(TumblingProcessingTimeWindows.of(asTime(size)))
+
+ case ProcessingTimeTumblingGroupWindow(_, size) =>
+ stream.countWindow(asCount(size))
+
+ case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
+ stream.window(TumblingEventTimeWindows.of(asTime(size)))
+
+ case EventTimeTumblingGroupWindow(_, _, size) =>
+ // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
+ // before applying the windowing logic. Otherwise, this would be the same as a
+ // ProcessingTimeTumblingGroupWindow
+ throw new UnsupportedOperationException(
+ "Event-time grouping windows on row intervals are currently not supported.")
+
+ case ProcessingTimeSlidingGroupWindow(_, size, slide) if isTimeInterval(size.resultType) =>
+ stream.window(SlidingProcessingTimeWindows.of(asTime(size), asTime(slide)))
+
+ case ProcessingTimeSlidingGroupWindow(_, size, slide) =>
+ stream.countWindow(asCount(size), asCount(slide))
+
+ case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
+ stream.window(SlidingEventTimeWindows.of(asTime(size), asTime(slide)))
+
+ case EventTimeSlidingGroupWindow(_, _, size, slide) =>
+ // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
+ // before applying the windowing logic. Otherwise, this would be the same as a
+ // ProcessingTimeTumblingGroupWindow
+ throw new UnsupportedOperationException(
+ "Event-time grouping windows on row intervals are currently not supported.")
+
+ case ProcessingTimeSessionGroupWindow(_, gap: Expression) =>
+ stream.window(ProcessingTimeSessionWindows.withGap(asTime(gap)))
+
+ case EventTimeSessionGroupWindow(_, _, gap) =>
+ stream.window(EventTimeSessionWindows.withGap(asTime(gap)))
+ }
+
+ private def createNonKeyedWindowedStream(groupWindow: LogicalWindow, stream: DataStream[Row])
+ : AllWindowedStream[Row, _ <: DataStreamWindow] = groupWindow match {
+
+ case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
+ stream.windowAll(TumblingProcessingTimeWindows.of(asTime(size)))
+
+ case ProcessingTimeTumblingGroupWindow(_, size) =>
+ stream.countWindowAll(asCount(size))
+
+ case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
+ stream.windowAll(TumblingEventTimeWindows.of(asTime(size)))
+
+ case EventTimeTumblingGroupWindow(_, _, size) =>
+ // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
+ // before applying the windowing logic. Otherwise, this would be the same as a
+ // ProcessingTimeTumblingGroupWindow
+ throw new UnsupportedOperationException(
+ "Event-time grouping windows on row intervals are currently not supported.")
+
+ case ProcessingTimeSlidingGroupWindow(_, size, slide) if isTimeInterval(size.resultType) =>
+ stream.windowAll(SlidingProcessingTimeWindows.of(asTime(size), asTime(slide)))
+
+ case ProcessingTimeSlidingGroupWindow(_, size, slide) =>
+ stream.countWindowAll(asCount(size), asCount(slide))
+
+ case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
+ stream.windowAll(SlidingEventTimeWindows.of(asTime(size), asTime(slide)))
+
+ case EventTimeSlidingGroupWindow(_, _, size, slide) =>
+ // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
+ // before applying the windowing logic. Otherwise, this would be the same as a
+ // ProcessingTimeTumblingGroupWindow
+ throw new UnsupportedOperationException(
+ "Event-time grouping windows on row intervals are currently not supported.")
+
+ case ProcessingTimeSessionGroupWindow(_, gap) =>
+ stream.windowAll(ProcessingTimeSessionWindows.withGap(asTime(gap)))
+
+ case EventTimeSessionGroupWindow(_, _, gap) =>
+ stream.windowAll(EventTimeSessionWindows.withGap(asTime(gap)))
+ }
+
+ def asTime(expr: Expression): Time = expr match {
+ case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) => Time.milliseconds(value)
+ case _ => throw new IllegalArgumentException()
+ }
+
+ def asCount(expr: Expression): Long = expr match {
+ case Literal(value: Long, RowIntervalTypeInfo.INTERVAL_ROWS) => value
+ case _ => throw new IllegalArgumentException()
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/24fa1a1c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 0bee4e5..c16b469 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -173,8 +173,9 @@ object FlinkRuleSets {
*/
val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(
// translate to DataStream nodes
+ DataStreamGroupAggregateRule.INSTANCE,
DataStreamOverAggregateRule.INSTANCE,
- DataStreamAggregateRule.INSTANCE,
+ DataStreamGroupWindowAggregateRule.INSTANCE,
DataStreamCalcRule.INSTANCE,
DataStreamScanRule.INSTANCE,
DataStreamUnionRule.INSTANCE,
http://git-wip-us.apache.org/repos/asf/flink/blob/24fa1a1c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
deleted file mode 100644
index f011b66..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.rules.datastream
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.plan.nodes.FlinkConventions
-import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate
-import org.apache.flink.table.plan.nodes.logical.FlinkLogicalWindowAggregate
-
-import scala.collection.JavaConversions._
-
-class DataStreamAggregateRule
- extends ConverterRule(
- classOf[FlinkLogicalWindowAggregate],
- FlinkConventions.LOGICAL,
- FlinkConventions.DATASTREAM,
- "DataStreamAggregateRule") {
-
- override def matches(call: RelOptRuleCall): Boolean = {
- val agg: FlinkLogicalWindowAggregate = call.rel(0).asInstanceOf[FlinkLogicalWindowAggregate]
-
- // check if we have distinct aggregates
- val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
- if (distinctAggs) {
- throw TableException("DISTINCT aggregates are currently not supported.")
- }
-
- // check if we have grouping sets
- val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
- if (groupSets || agg.indicator) {
- throw TableException("GROUPING SETS are currently not supported.")
- }
-
- !distinctAggs && !groupSets && !agg.indicator
- }
-
- override def convert(rel: RelNode): RelNode = {
- val agg: FlinkLogicalWindowAggregate = rel.asInstanceOf[FlinkLogicalWindowAggregate]
- val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
- val convInput: RelNode = RelOptRule.convert(agg.getInput, FlinkConventions.DATASTREAM)
-
- new DataStreamAggregate(
- agg.getWindow,
- agg.getNamedProperties,
- rel.getCluster,
- traitSet,
- convInput,
- agg.getNamedAggCalls,
- rel.getRowType,
- agg.getInput.getRowType,
- agg.getGroupSet.toArray)
- }
- }
-
-object DataStreamAggregateRule {
- val INSTANCE: RelOptRule = new DataStreamAggregateRule
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/24fa1a1c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupAggregateRule.scala
new file mode 100644
index 0000000..a65c378
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupAggregateRule.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalAggregate
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.datastream.DataStreamGroupAggregate
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalAggregate
+
+import scala.collection.JavaConversions._
+
+/**
+ * Rule to convert a [[LogicalAggregate]] into a [[DataStreamGroupAggregate]].
+ */
+class DataStreamGroupAggregateRule
+ extends ConverterRule(
+ classOf[FlinkLogicalAggregate],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.DATASTREAM,
+ "DataStreamGroupAggregateRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val agg: FlinkLogicalAggregate = call.rel(0).asInstanceOf[FlinkLogicalAggregate]
+
+ // check if we have distinct aggregates
+ val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
+ if (distinctAggs) {
+ throw TableException("DISTINCT aggregates are currently not supported.")
+ }
+
+ // check if we have grouping sets
+ val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
+ if (groupSets || agg.indicator) {
+ throw TableException("GROUPING SETS are currently not supported.")
+ }
+
+ !distinctAggs && !groupSets && !agg.indicator
+ }
+
+ override def convert(rel: RelNode): RelNode = {
+ val agg: FlinkLogicalAggregate = rel.asInstanceOf[FlinkLogicalAggregate]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
+ val convInput: RelNode = RelOptRule.convert(agg.getInput, FlinkConventions.DATASTREAM)
+
+ new DataStreamGroupAggregate(
+ rel.getCluster,
+ traitSet,
+ convInput,
+ agg.getNamedAggCalls,
+ rel.getRowType,
+ agg.getInput.getRowType,
+ agg.getGroupSet.toArray)
+ }
+}
+
+object DataStreamGroupAggregateRule {
+ val INSTANCE: RelOptRule = new DataStreamGroupAggregateRule
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/24fa1a1c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupWindowAggregateRule.scala
new file mode 100644
index 0000000..fdf44a6
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupWindowAggregateRule.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregate
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalWindowAggregate
+
+import scala.collection.JavaConversions._
+
+class DataStreamGroupWindowAggregateRule
+ extends ConverterRule(
+ classOf[FlinkLogicalWindowAggregate],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.DATASTREAM,
+ "DataStreamGroupWindowAggregateRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val agg: FlinkLogicalWindowAggregate = call.rel(0).asInstanceOf[FlinkLogicalWindowAggregate]
+
+ // check if we have distinct aggregates
+ val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
+ if (distinctAggs) {
+ throw TableException("DISTINCT aggregates are currently not supported.")
+ }
+
+ // check if we have grouping sets
+ val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
+ if (groupSets || agg.indicator) {
+ throw TableException("GROUPING SETS are currently not supported.")
+ }
+
+ !distinctAggs && !groupSets && !agg.indicator
+ }
+
+ override def convert(rel: RelNode): RelNode = {
+ val agg: FlinkLogicalWindowAggregate = rel.asInstanceOf[FlinkLogicalWindowAggregate]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
+ val convInput: RelNode = RelOptRule.convert(agg.getInput, FlinkConventions.DATASTREAM)
+
+ new DataStreamGroupWindowAggregate(
+ agg.getWindow,
+ agg.getNamedProperties,
+ rel.getCluster,
+ traitSet,
+ convInput,
+ agg.getNamedAggCalls,
+ rel.getRowType,
+ agg.getInput.getRowType,
+ agg.getGroupSet.toArray)
+ }
+ }
+
+object DataStreamGroupWindowAggregateRule {
+ val INSTANCE: RelOptRule = new DataStreamGroupWindowAggregateRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/24fa1a1c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index a82f383..f93d870 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -81,8 +81,7 @@ object AggregateUtil {
inputType,
needRetract)
- val aggregationStateType: RowTypeInfo =
- createDataSetAggregateBufferDataType(Array(), aggregates, inputType)
+ val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates)
val forwardMapping = (0 until inputType.getFieldCount).toArray
val aggMapping = aggregates.indices.map(x => x + inputType.getFieldCount).toArray
@@ -132,7 +131,55 @@ object AggregateUtil {
}
/**
- * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] for
+ * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] for group (without
+ * window) aggregate to evaluate final aggregate value.
+ *
+ * @param generator code generator instance
+ * @param namedAggregates List of calls to aggregate functions and their output field names
+ * @param inputType Input row type
+ * @param groupings the position (in the input Row) of the grouping keys
+ * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
+ */
+ private[flink] def createGroupAggregateFunction(
+ generator: CodeGenerator,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ groupings: Array[Int]): ProcessFunction[Row, Row] = {
+
+ val (aggFields, aggregates) =
+ transformToAggregateFunctions(
+ namedAggregates.map(_.getKey),
+ inputType,
+ needRetraction = false)
+ val aggMapping = aggregates.indices.map(_ + groupings.length).toArray
+
+ val outputArity = groupings.length + aggregates.length
+
+ val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates)
+
+ val genFunction = generator.generateAggregations(
+ "NonWindowedAggregationHelper",
+ generator,
+ inputType,
+ aggregates,
+ aggFields,
+ aggMapping,
+ partialResults = false,
+ groupings,
+ None,
+ None,
+ outputArity,
+ needRetract = false,
+ needMerge = false
+ )
+
+ new GroupAggProcessFunction(
+ genFunction,
+ aggregationStateType)
+ }
+
+ /**
+ * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] for ROWS clause
* bounded OVER window to evaluate final aggregate value.
*
* @param generator code generator instance
@@ -253,7 +300,7 @@ object AggregateUtil {
needRetract)
val mapReturnType: RowTypeInfo =
- createDataSetAggregateBufferDataType(
+ createRowTypeForKeysAndAggregates(
groupings,
aggregates,
inputType,
@@ -355,7 +402,7 @@ object AggregateUtil {
inputType,
needRetract)
- val returnType: RowTypeInfo = createDataSetAggregateBufferDataType(
+ val returnType: RowTypeInfo = createRowTypeForKeysAndAggregates(
groupings,
aggregates,
inputType,
@@ -617,7 +664,7 @@ object AggregateUtil {
window match {
case EventTimeSessionGroupWindow(_, _, gap) =>
val combineReturnType: RowTypeInfo =
- createDataSetAggregateBufferDataType(
+ createRowTypeForKeysAndAggregates(
groupings,
aggregates,
inputType,
@@ -689,7 +736,7 @@ object AggregateUtil {
case EventTimeSessionGroupWindow(_, _, gap) =>
val combineReturnType: RowTypeInfo =
- createDataSetAggregateBufferDataType(
+ createRowTypeForKeysAndAggregates(
groupings,
aggregates,
inputType,
@@ -1297,7 +1344,7 @@ object AggregateUtil {
aggTypes
}
- private def createDataSetAggregateBufferDataType(
+ private def createRowTypeForKeysAndAggregates(
groupings: Array[Int],
aggregates: Array[TableAggregateFunction[_, _]],
inputType: RelDataType,
http://git-wip-us.apache.org/repos/asf/flink/blob/24fa1a1c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
new file mode 100644
index 0000000..81c900c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.slf4j.LoggerFactory
+
+/**
+ * Aggregate Function used for the groupby (without window) aggregate
+ *
+ * @param genAggregations Generated aggregate helper function
+ * @param aggregationStateType The row type info of aggregation
+ */
+class GroupAggProcessFunction(
+ private val genAggregations: GeneratedAggregationsFunction,
+ private val aggregationStateType: RowTypeInfo)
+ extends ProcessFunction[Row, Row]
+ with Compiler[GeneratedAggregations] {
+
+ val LOG = LoggerFactory.getLogger(this.getClass)
+ private var function: GeneratedAggregations = _
+
+ private var output: Row = _
+ private var state: ValueState[Row] = _
+
+ override def open(config: Configuration) {
+ LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
+ s"Code:\n$genAggregations.code")
+ val clazz = compile(
+ getRuntimeContext.getUserCodeClassLoader,
+ genAggregations.name,
+ genAggregations.code)
+ LOG.debug("Instantiating AggregateHelper.")
+ function = clazz.newInstance()
+ output = function.createOutputRow()
+
+ val stateDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("GroupAggregateState", aggregationStateType)
+ state = getRuntimeContext.getState(stateDescriptor)
+ }
+
+ override def processElement(
+ input: Row,
+ ctx: ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+ // get accumulators
+ var accumulators = state.value()
+ if (null == accumulators) {
+ accumulators = function.createAccumulators()
+ }
+
+ // Set group keys value to the final output
+ function.setForwardedFields(input, output)
+
+ // accumulate new input row
+ function.accumulate(accumulators, input)
+
+ // set aggregation results to output
+ function.setAggregationResults(accumulators, output)
+
+ // update accumulators
+ state.update(accumulators)
+
+ out.collect(output)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/24fa1a1c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
index 7f87e50..b63eb81 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory
* Process Function used for the aggregate in bounded proc-time OVER window
* [[org.apache.flink.streaming.api.datastream.DataStream]]
*
- * @param genAggregations Generated aggregate helper function
+ * @param genAggregations Generated aggregate helper function
* @param precedingTimeBoundary Is used to indicate the processing time boundaries
* @param aggregatesTypeInfo row type info of aggregation
* @param inputType row type info of input row
http://git-wip-us.apache.org/repos/asf/flink/blob/24fa1a1c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
index 6ebfec0..033019b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
@@ -231,7 +231,7 @@ class FieldProjectionTest extends TableTestBase {
val expected =
unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(0),
@@ -259,7 +259,7 @@ class FieldProjectionTest extends TableTestBase {
val expected = unaryNode(
"DataStreamCalc",
unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(0),
http://git-wip-us.apache.org/repos/asf/flink/blob/24fa1a1c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index 67d13b0..f7bdccf 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -47,6 +47,27 @@ class SqlITCase extends StreamingWithStateTestBase {
(8L, 8, "Hello World"),
(20L, 20, "Hello World"))
+ /** test unbounded groupby (without window) **/
+ @Test
+ def testUnboundedGroupby(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val sqlQuery = "SELECT b, COUNT(a) FROM MyTable GROUP BY b"
+
+ val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", t)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList("1,1", "2,1", "2,2")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
/** test selection **/
@Test
def testSelectExpressionFromTable(): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/24fa1a1c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 578a6a8..01263a3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -61,7 +61,7 @@ class WindowAggregateTest extends TableTestBase {
val sqlQuery = "SELECT a, AVG(c) OVER (PARTITION BY a ORDER BY procTime()" +
"RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW) AS avgA " +
"FROM MyTable"
- val expected =
+ val expected =
unaryNode(
"DataStreamCalc",
unaryNode(
@@ -71,7 +71,7 @@ class WindowAggregateTest extends TableTestBase {
streamTableNode(0),
term("select", "a", "c", "PROCTIME() AS $2")
),
- term("partitionBy","a"),
+ term("partitionBy", "a"),
term("orderBy", "PROCTIME"),
term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
term("select", "a", "c", "PROCTIME", "COUNT(c) AS w0$o0", "$SUM0(c) AS w0$o1")
@@ -83,6 +83,27 @@ class WindowAggregateTest extends TableTestBase {
}
@Test
+ def testGroupbyWithoutWindow() = {
+ val sql = "SELECT COUNT(a) FROM MyTable GROUP BY b"
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "b", "a")
+ ),
+ term("groupBy", "b"),
+ term("select", "b", "COUNT(a) AS EXPR$0")
+ ),
+ term("select", "EXPR$0")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+ @Test
def testTumbleFunction() = {
val sql =
@@ -96,7 +117,7 @@ class WindowAggregateTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(0),
@@ -122,7 +143,7 @@ class WindowAggregateTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(0),
@@ -150,7 +171,7 @@ class WindowAggregateTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(0),
@@ -200,21 +221,6 @@ class WindowAggregateTest extends TableTestBase {
streamUtil.verifySql(sql, "n/a")
}
- @Test(expected = classOf[TableException])
- def testMultiWindow() = {
- val sql = "SELECT COUNT(*) FROM MyTable GROUP BY " +
- "FLOOR(rowtime() TO HOUR), FLOOR(rowtime() TO MINUTE)"
- val expected = ""
- streamUtil.verifySql(sql, expected)
- }
-
- @Test(expected = classOf[TableException])
- def testInvalidWindowExpression() = {
- val sql = "SELECT COUNT(*) FROM MyTable GROUP BY FLOOR(localTimestamp TO HOUR)"
- val expected = ""
- streamUtil.verifySql(sql, expected)
- }
-
@Test
def testUnboundPartitionedProcessingWindowWithRange() = {
val sql = "SELECT " +
http://git-wip-us.apache.org/repos/asf/flink/blob/24fa1a1c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
deleted file mode 100644
index 9f366a8..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.table.AggregationsITCase.TimestampAndWatermarkWithOffset
-import org.apache.flink.table.api.scala.stream.utils.StreamITCase
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-/**
- * We only test some aggregations until better testing of constructed DataStream
- * programs is possible.
- */
-class AggregationsITCase extends StreamingMultipleProgramsTestBase {
-
- val data = List(
- (1L, 1, "Hi"),
- (2L, 2, "Hello"),
- (4L, 2, "Hello"),
- (8L, 3, "Hello world"),
- (16L, 3, "Hello world"))
-
- @Test
- def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val stream = env.fromCollection(data)
- val table = stream.toTable(tEnv, 'long, 'int, 'string)
-
- val windowedTable = table
- .window(Slide over 2.rows every 1.rows as 'w)
- .groupBy('w, 'string)
- .select('string, 'int.count, 'int.avg)
-
- val results = windowedTable.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = Seq("Hello world,1,3", "Hello world,2,3", "Hello,1,2", "Hello,2,2", "Hi,1,1")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testEventTimeSessionGroupWindowOverTime(): Unit = {
- //To verify the "merge" functionality, we create this test with the following characteristics:
- // 1. set the Parallelism to 1, and have the test data out of order
- // 2. create a waterMark with 10ms offset to delay the window emission by 10ms
- val sessionWindowTestdata = List(
- (1L, 1, "Hello"),
- (2L, 2, "Hello"),
- (8L, 8, "Hello"),
- (9L, 9, "Hello World"),
- (4L, 4, "Hello"),
- (16L, 16, "Hello"))
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setParallelism(1)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val stream = env
- .fromCollection(sessionWindowTestdata)
- .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset(10L))
- val table = stream.toTable(tEnv, 'long, 'int, 'string)
-
- val windowedTable = table
- .window(Session withGap 5.milli on 'rowtime as 'w)
- .groupBy('w, 'string)
- .select('string, 'int.count, 'int.sum)
-
- val results = windowedTable.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = Seq("Hello World,1,9", "Hello,1,16", "Hello,4,15")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val stream = env.fromCollection(data)
- val table = stream.toTable(tEnv, 'long, 'int, 'string)
-
- val windowedTable = table
- .window(Tumble over 2.rows as 'w)
- .groupBy('w)
- .select('int.count)
-
- val results = windowedTable.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = Seq("2", "2")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testEventTimeTumblingWindow(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val stream = env
- .fromCollection(data)
- .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset(0L))
- val table = stream.toTable(tEnv, 'long, 'int, 'string)
-
- val windowedTable = table
- .window(Tumble over 5.milli on 'rowtime as 'w)
- .groupBy('w, 'string)
- .select('string, 'int.count, 'int.avg, 'int.min, 'int.max, 'int.sum, 'w.start, 'w.end)
-
- val results = windowedTable.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = Seq(
- "Hello world,1,3,3,3,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01",
- "Hello world,1,3,3,3,3,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02",
- "Hello,2,2,2,2,4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
- "Hi,1,1,1,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-}
-
-object AggregationsITCase {
- class TimestampAndWatermarkWithOffset(
- offset: Long) extends AssignerWithPunctuatedWatermarks[(Long, Int, String)] {
-
- override def checkAndGetNextWatermark(
- lastElement: (Long, Int, String),
- extractedTimestamp: Long)
- : Watermark = {
- new Watermark(extractedTimestamp - offset)
- }
-
- override def extractTimestamp(
- element: (Long, Int, String),
- previousElementTimestamp: Long): Long = {
- element._1
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/24fa1a1c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
new file mode 100644
index 0000000..271e90b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.mutable
+
+/**
+ * Tests of groupby (without window) aggregations
+ */
+class GroupAggregationsITCase extends StreamingWithStateTestBase {
+
+ @Test
+ def testNonKeyedGroupAggregate(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ env.setParallelism(1)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+ .select('a.sum, 'b.sum)
+
+ val results = t.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,1", "3,3", "6,5", "10,8", "15,11", "21,14", "28,18", "36,22", "45,26", "55,30", "66,35",
+ "78,40", "91,45", "105,50", "120,55", "136,61", "153,67", "171,73", "190,79", "210,85",
+ "231,91")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testGroupAggregate(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ env.setParallelism(1)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+ .groupBy('b)
+ .select('b, 'a.sum)
+
+ val results = t.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,1", "2,2", "2,5", "3,4", "3,9", "3,15", "4,7", "4,15",
+ "4,24", "4,34", "5,11", "5,23", "5,36", "5,50", "5,65", "6,16", "6,33", "6,51", "6,70",
+ "6,90", "6,111")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testDoubleGroupAggregation(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+ .groupBy('b)
+ .select('a.sum as 'd, 'b)
+ .groupBy('b, 'd)
+ .select('b)
+
+ val results = t.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1",
+ "2", "2",
+ "3", "3", "3",
+ "4", "4", "4", "4",
+ "5", "5", "5", "5", "5",
+ "6", "6", "6", "6", "6", "6")
+
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testGroupAggregateWithExpression(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ env.setParallelism(1)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+ .groupBy('e, 'b % 3)
+ .select('c.min, 'e, 'a.avg, 'd.count)
+
+ val results = t.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "0,1,1,1", "1,2,2,1", "2,1,2,1", "3,2,3,1", "1,2,2,2",
+ "5,3,3,1", "3,2,3,2", "7,1,4,1", "2,1,3,2", "3,2,3,3", "7,1,4,2", "5,3,4,2", "12,3,5,1",
+ "1,2,3,3", "14,2,5,1")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/24fa1a1c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsTest.scala
new file mode 100644
index 0000000..520592c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsTest.scala
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala._
+import org.apache.flink.table.utils.TableTestUtil._
+
+class GroupAggregationsTest extends TableTestBase {
+
+ @Test(expected = classOf[ValidationException])
+ def testGroupingOnNonExistentField(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+ val ds = table
+ // must fail. '_foo is not a valid field
+ .groupBy('_foo)
+ .select('a.avg)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testGroupingInvalidSelection(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+ val ds = table
+ .groupBy('a, 'b)
+ // must fail. 'c is not a grouping key or aggregation
+ .select('c)
+ }
+
+ @Test
+ def testGroupbyWithoutWindow() = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+ val resultTable = table
+ .groupBy('b)
+ .select('a.count)
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "b")
+ ),
+ term("groupBy", "b"),
+ term("select", "b", "COUNT(a) AS TMP_0")
+ ),
+ term("select", "TMP_0")
+ )
+ util.verifyTable(resultTable, expected)
+ }
+
+
+ @Test
+ def testGroupAggregateWithConstant1(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+ val resultTable = table
+ .select('a, 4 as 'four, 'b)
+ .groupBy('four, 'a)
+ .select('four, 'b.sum)
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "4 AS four", "b", "a")
+ ),
+ term("groupBy", "four", "a"),
+ term("select", "four", "a", "SUM(b) AS TMP_0")
+ ),
+ term("select", "4 AS four", "TMP_0")
+ )
+ util.verifyTable(resultTable, expected)
+ }
+
+ @Test
+ def testGroupAggregateWithConstant2(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+ val resultTable = table
+ .select('b, 4 as 'four, 'a)
+ .groupBy('b, 'four)
+ .select('four, 'a.sum)
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "4 AS four", "a", "b")
+ ),
+ term("groupBy", "four", "b"),
+ term("select", "four", "b", "SUM(a) AS TMP_0")
+ ),
+ term("select", "4 AS four", "TMP_0")
+ )
+ util.verifyTable(resultTable, expected)
+ }
+
+ @Test
+ def testGroupAggregateWithExpressionInSelect(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+ val resultTable = table
+ .select('a as 'a, 'b % 3 as 'd, 'c as 'c)
+ .groupBy('d)
+ .select('c.min, 'a.avg)
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "MOD(b, 3) AS d", "c")
+ ),
+ term("groupBy", "d"),
+ term("select", "d", "MIN(c) AS TMP_0", "AVG(a) AS TMP_1")
+ ),
+ term("select", "TMP_0", "TMP_1")
+ )
+ util.verifyTable(resultTable, expected)
+ }
+
+ @Test
+ def testGroupAggregateWithFilter(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+ val resultTable = table
+ .groupBy('b)
+ .select('b, 'a.sum)
+ .where('b === 2)
+
+ val expected =
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "b", "a"),
+ term("where", "=(b, 2)")
+ ),
+ term("groupBy", "b"),
+ term("select", "b", "SUM(a) AS TMP_0")
+ )
+ util.verifyTable(resultTable, expected)
+ }
+
+ @Test
+ def testGroupAggregateWithAverage(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+ val resultTable = table
+ .groupBy('b)
+ .select('b, 'a.cast(BasicTypeInfo.DOUBLE_TYPE_INFO).avg)
+
+ val expected =
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "b", "a", "CAST(a) AS a0")
+ ),
+ term("groupBy", "b"),
+ term("select", "b", "AVG(a0) AS TMP_0")
+ )
+
+ util.verifyTable(resultTable, expected)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/24fa1a1c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
new file mode 100644
index 0000000..3e3c57d
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.table.GroupWindowAggregationsITCase.TimestampAndWatermarkWithOffset
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+/**
+ * We only test some aggregations until better testing of constructed DataStream
+ * programs is possible.
+ */
+class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
+
+ val data = List(
+ (1L, 1, "Hi"),
+ (2L, 2, "Hello"),
+ (4L, 2, "Hello"),
+ (8L, 3, "Hello world"),
+ (16L, 3, "Hello world"))
+
+ @Test
+ def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env.fromCollection(data)
+ val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+ val windowedTable = table
+ .window(Slide over 2.rows every 1.rows as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'int.count, 'int.avg)
+
+ val results = windowedTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq("Hello world,1,3", "Hello world,2,3", "Hello,1,2", "Hello,2,2", "Hi,1,1")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testEventTimeSessionGroupWindowOverTime(): Unit = {
+ //To verify the "merge" functionality, we create this test with the following characteristics:
+ // 1. set the Parallelism to 1, and have the test data out of order
+ // 2. create a waterMark with 10ms offset to delay the window emission by 10ms
+ val sessionWindowTestdata = List(
+ (1L, 1, "Hello"),
+ (2L, 2, "Hello"),
+ (8L, 8, "Hello"),
+ (9L, 9, "Hello World"),
+ (4L, 4, "Hello"),
+ (16L, 16, "Hello"))
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setParallelism(1)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(sessionWindowTestdata)
+ .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset(10L))
+ val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+ val windowedTable = table
+ .window(Session withGap 5.milli on 'rowtime as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'int.count, 'int.sum)
+
+ val results = windowedTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq("Hello World,1,9", "Hello,1,16", "Hello,4,15")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env.fromCollection(data)
+ val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+ val windowedTable = table
+ .window(Tumble over 2.rows as 'w)
+ .groupBy('w)
+ .select('int.count)
+
+ val results = windowedTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq("2", "2")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testEventTimeTumblingWindow(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset(0L))
+ val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+ val windowedTable = table
+ .window(Tumble over 5.milli on 'rowtime as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'int.count, 'int.avg, 'int.min, 'int.max, 'int.sum, 'w.start, 'w.end)
+
+ val results = windowedTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq(
+ "Hello world,1,3,3,3,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01",
+ "Hello world,1,3,3,3,3,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02",
+ "Hello,2,2,2,2,4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+ "Hi,1,1,1,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+}
+
+object GroupWindowAggregationsITCase {
+ class TimestampAndWatermarkWithOffset(
+ offset: Long) extends AssignerWithPunctuatedWatermarks[(Long, Int, String)] {
+
+ override def checkAndGetNextWatermark(
+ lastElement: (Long, Int, String),
+ extractedTimestamp: Long)
+ : Watermark = {
+ new Watermark(extractedTimestamp - offset)
+ }
+
+ override def extractTimestamp(
+ element: (Long, Int, String),
+ previousElementTimestamp: Long): Long = {
+ element._1
+ }
+ }
+}