You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/04/06 19:29:19 UTC
[12/12] flink git commit: [FLINK-6216] [table] Add non-windowed
GroupBy aggregation for streams.
[FLINK-6216] [table] Add non-windowed GroupBy aggregation for streams.
This closes #3646.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff262508
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff262508
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff262508
Branch: refs/heads/table-retraction
Commit: ff2625089e9f184326c7a8b39cbdbec35ba58869
Parents: c5173fa
Author: shaoxuan-wang <ws...@gmail.com>
Authored: Thu Mar 30 03:57:58 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Apr 6 21:28:57 2017 +0200
----------------------------------------------------------------------
.../flink/table/plan/logical/operators.scala | 3 -
.../nodes/datastream/DataStreamAggregate.scala | 267 -------------------
.../datastream/DataStreamGroupAggregate.scala | 133 +++++++++
.../DataStreamGroupWindowAggregate.scala | 267 +++++++++++++++++++
.../flink/table/plan/rules/FlinkRuleSets.scala | 3 +-
.../datastream/DataStreamAggregateRule.scala | 76 ------
.../DataStreamGroupAggregateRule.scala | 77 ++++++
.../DataStreamGroupWindowAggregateRule.scala | 75 ++++++
.../table/runtime/aggregate/AggregateUtil.scala | 46 +++-
.../aggregate/GroupAggProcessFunction.scala | 100 +++++++
.../scala/batch/table/FieldProjectionTest.scala | 4 +-
.../table/api/scala/stream/sql/SqlITCase.scala | 21 ++
.../scala/stream/sql/WindowAggregateTest.scala | 47 ++--
.../scala/stream/table/AggregationsITCase.scala | 167 ------------
.../stream/table/GroupAggregationsITCase.scala | 132 +++++++++
.../stream/table/GroupAggregationsTest.scala | 218 +++++++++++++++
.../table/GroupWindowAggregationsITCase.scala | 167 ++++++++++++
.../scala/stream/table/GroupWindowTest.scala | 56 ++--
.../scala/stream/table/UnsupportedOpsTest.scala | 7 -
19 files changed, 1288 insertions(+), 578 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ff262508/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 559bd75..7438082 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -221,9 +221,6 @@ case class Aggregate(
}
override def validate(tableEnv: TableEnvironment): LogicalNode = {
- if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
- failValidation(s"Aggregate on stream tables is currently not supported.")
- }
val resolvedAggregate = super.validate(tableEnv).asInstanceOf[Aggregate]
val groupingExprs = resolvedAggregate.groupingExpressions
http://git-wip-us.apache.org/repos/asf/flink/blob/ff262508/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
deleted file mode 100644
index 50f8281..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.nodes.datastream
-
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.AggregateCall
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
-import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
-import org.apache.flink.table.api.StreamTableEnvironment
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.expressions._
-import org.apache.flink.table.plan.logical._
-import org.apache.flink.table.plan.nodes.CommonAggregate
-import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
-import org.apache.flink.table.runtime.aggregate.AggregateUtil._
-import org.apache.flink.table.runtime.aggregate._
-import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
-import org.apache.flink.types.Row
-
-class DataStreamAggregate(
- window: LogicalWindow,
- namedProperties: Seq[NamedWindowProperty],
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- inputNode: RelNode,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- rowRelDataType: RelDataType,
- inputType: RelDataType,
- grouping: Array[Int])
- extends SingleRel(cluster, traitSet, inputNode) with CommonAggregate with DataStreamRel {
-
- override def deriveRowType(): RelDataType = rowRelDataType
-
- override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
- new DataStreamAggregate(
- window,
- namedProperties,
- cluster,
- traitSet,
- inputs.get(0),
- namedAggregates,
- getRowType,
- inputType,
- grouping)
- }
-
- override def toString: String = {
- s"Aggregate(${
- if (!grouping.isEmpty) {
- s"groupBy: (${groupingToString(inputType, grouping)}), "
- } else {
- ""
- }
- }window: ($window), " +
- s"select: (${
- aggregationToString(
- inputType,
- grouping,
- getRowType,
- namedAggregates,
- namedProperties)
- }))"
- }
-
- override def explainTerms(pw: RelWriter): RelWriter = {
- super.explainTerms(pw)
- .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty)
- .item("window", window)
- .item(
- "select", aggregationToString(
- inputType,
- grouping,
- getRowType,
- namedAggregates,
- namedProperties))
- }
-
- override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
-
- val groupingKeys = grouping.indices.toArray
- val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
-
- val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
-
- val aggString = aggregationToString(
- inputType,
- grouping,
- getRowType,
- namedAggregates,
- namedProperties)
-
- val keyedAggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " +
- s"window: ($window), " +
- s"select: ($aggString)"
- val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
-
- // grouped / keyed aggregation
- if (groupingKeys.length > 0) {
- val windowFunction = AggregateUtil.createAggregationGroupWindowFunction(
- window,
- groupingKeys.length,
- namedAggregates.size,
- rowRelDataType.getFieldCount,
- namedProperties)
-
- val keyedStream = inputDS.keyBy(groupingKeys: _*)
- val windowedStream =
- createKeyedWindowedStream(window, keyedStream)
- .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
- val (aggFunction, accumulatorRowType, aggResultRowType) =
- AggregateUtil.createDataStreamAggregateFunction(
- namedAggregates,
- inputType,
- rowRelDataType,
- grouping)
-
- windowedStream
- .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, rowTypeInfo)
- .name(keyedAggOpName)
- }
- // global / non-keyed aggregation
- else {
- val windowFunction = AggregateUtil.createAggregationAllWindowFunction(
- window,
- rowRelDataType.getFieldCount,
- namedProperties)
-
- val windowedStream =
- createNonKeyedWindowedStream(window, inputDS)
- .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
-
- val (aggFunction, accumulatorRowType, aggResultRowType) =
- AggregateUtil.createDataStreamAggregateFunction(
- namedAggregates,
- inputType,
- rowRelDataType,
- grouping)
-
- windowedStream
- .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, rowTypeInfo)
- .name(nonKeyedAggOpName)
- }
- }
-}
-
-object DataStreamAggregate {
-
-
- private def createKeyedWindowedStream(groupWindow: LogicalWindow, stream: KeyedStream[Row, Tuple])
- : WindowedStream[Row, Tuple, _ <: DataStreamWindow] = groupWindow match {
-
- case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
- stream.window(TumblingProcessingTimeWindows.of(asTime(size)))
-
- case ProcessingTimeTumblingGroupWindow(_, size) =>
- stream.countWindow(asCount(size))
-
- case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
- stream.window(TumblingEventTimeWindows.of(asTime(size)))
-
- case EventTimeTumblingGroupWindow(_, _, size) =>
- // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
- // before applying the windowing logic. Otherwise, this would be the same as a
- // ProcessingTimeTumblingGroupWindow
- throw new UnsupportedOperationException(
- "Event-time grouping windows on row intervals are currently not supported.")
-
- case ProcessingTimeSlidingGroupWindow(_, size, slide) if isTimeInterval(size.resultType) =>
- stream.window(SlidingProcessingTimeWindows.of(asTime(size), asTime(slide)))
-
- case ProcessingTimeSlidingGroupWindow(_, size, slide) =>
- stream.countWindow(asCount(size), asCount(slide))
-
- case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
- stream.window(SlidingEventTimeWindows.of(asTime(size), asTime(slide)))
-
- case EventTimeSlidingGroupWindow(_, _, size, slide) =>
- // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
- // before applying the windowing logic. Otherwise, this would be the same as a
- // ProcessingTimeTumblingGroupWindow
- throw new UnsupportedOperationException(
- "Event-time grouping windows on row intervals are currently not supported.")
-
- case ProcessingTimeSessionGroupWindow(_, gap: Expression) =>
- stream.window(ProcessingTimeSessionWindows.withGap(asTime(gap)))
-
- case EventTimeSessionGroupWindow(_, _, gap) =>
- stream.window(EventTimeSessionWindows.withGap(asTime(gap)))
- }
-
- private def createNonKeyedWindowedStream(groupWindow: LogicalWindow, stream: DataStream[Row])
- : AllWindowedStream[Row, _ <: DataStreamWindow] = groupWindow match {
-
- case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
- stream.windowAll(TumblingProcessingTimeWindows.of(asTime(size)))
-
- case ProcessingTimeTumblingGroupWindow(_, size) =>
- stream.countWindowAll(asCount(size))
-
- case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
- stream.windowAll(TumblingEventTimeWindows.of(asTime(size)))
-
- case EventTimeTumblingGroupWindow(_, _, size) =>
- // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
- // before applying the windowing logic. Otherwise, this would be the same as a
- // ProcessingTimeTumblingGroupWindow
- throw new UnsupportedOperationException(
- "Event-time grouping windows on row intervals are currently not supported.")
-
- case ProcessingTimeSlidingGroupWindow(_, size, slide) if isTimeInterval(size.resultType) =>
- stream.windowAll(SlidingProcessingTimeWindows.of(asTime(size), asTime(slide)))
-
- case ProcessingTimeSlidingGroupWindow(_, size, slide) =>
- stream.countWindowAll(asCount(size), asCount(slide))
-
- case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
- stream.windowAll(SlidingEventTimeWindows.of(asTime(size), asTime(slide)))
-
- case EventTimeSlidingGroupWindow(_, _, size, slide) =>
- // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
- // before applying the windowing logic. Otherwise, this would be the same as a
- // ProcessingTimeTumblingGroupWindow
- throw new UnsupportedOperationException(
- "Event-time grouping windows on row intervals are currently not supported.")
-
- case ProcessingTimeSessionGroupWindow(_, gap) =>
- stream.windowAll(ProcessingTimeSessionWindows.withGap(asTime(gap)))
-
- case EventTimeSessionGroupWindow(_, _, gap) =>
- stream.windowAll(EventTimeSessionWindows.withGap(asTime(gap)))
- }
-
- def asTime(expr: Expression): Time = expr match {
- case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) => Time.milliseconds(value)
- case _ => throw new IllegalArgumentException()
- }
-
- def asCount(expr: Expression): Long = expr match {
- case Literal(value: Long, RowIntervalTypeInfo.INTERVAL_ROWS) => value
- case _ => throw new IllegalArgumentException()
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/ff262508/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
new file mode 100644
index 0000000..c2d4fb7
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.types.Row
+import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
+
+/**
+ *
+ * Flink RelNode for data stream unbounded group aggregate
+ *
+ * @param cluster Cluster of the RelNode, represent for an environment of related
+ * relational expressions during the optimization of a query.
+ * @param traitSet Trait set of the RelNode
+ * @param inputNode The input RelNode of aggregation
+ * @param namedAggregates List of calls to aggregate functions and their output field names
+ * @param rowRelDataType The type of the rows of the RelNode
+ * @param inputType The type of the rows of aggregation input RelNode
+ * @param groupings The position (in the input Row) of the grouping keys
+ */
+class DataStreamGroupAggregate(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ inputNode: RelNode,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ rowRelDataType: RelDataType,
+ inputType: RelDataType,
+ groupings: Array[Int])
+ extends SingleRel(cluster, traitSet, inputNode)
+ with CommonAggregate
+ with DataStreamRel {
+
+ override def deriveRowType() = rowRelDataType
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataStreamGroupAggregate(
+ cluster,
+ traitSet,
+ inputs.get(0),
+ namedAggregates,
+ getRowType,
+ inputType,
+ groupings)
+ }
+
+ override def toString: String = {
+ s"Aggregate(${
+ if (!groupings.isEmpty) {
+ s"groupBy: (${groupingToString(inputType, groupings)}), "
+ } else {
+ ""
+ }
+ }select:(${aggregationToString(inputType, groupings, getRowType, namedAggregates, Nil)}))"
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ super.explainTerms(pw)
+ .itemIf("groupBy", groupingToString(inputType, groupings), !groupings.isEmpty)
+ .item("select", aggregationToString(inputType, groupings, getRowType, namedAggregates, Nil))
+ }
+
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
+
+ val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+
+ val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+
+ val aggString = aggregationToString(
+ inputType,
+ groupings,
+ getRowType,
+ namedAggregates,
+ Nil)
+
+ val keyedAggOpName = s"groupBy: (${groupingToString(inputType, groupings)}), " +
+ s"select: ($aggString)"
+ val nonKeyedAggOpName = s"select: ($aggString)"
+
+ val processFunction = AggregateUtil.createGroupAggregateFunction(
+ namedAggregates,
+ inputType,
+ groupings)
+
+ val result: DataStream[Row] =
+ // grouped / keyed aggregation
+ if (groupings.nonEmpty) {
+ inputDS
+ .keyBy(groupings: _*)
+ .process(processFunction)
+ .returns(rowTypeInfo)
+ .name(keyedAggOpName)
+ .asInstanceOf[DataStream[Row]]
+ }
+ // global / non-keyed aggregation
+ else {
+ inputDS
+ .keyBy(new NullByteKeySelector[Row])
+ .process(processFunction)
+ .setParallelism(1)
+ .setMaxParallelism(1)
+ .returns(rowTypeInfo)
+ .name(nonKeyedAggOpName)
+ .asInstanceOf[DataStream[Row]]
+ }
+ result
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/ff262508/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
new file mode 100644
index 0000000..a0c1dec
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
+import org.apache.flink.types.Row
+
+class DataStreamGroupWindowAggregate(
+ window: LogicalWindow,
+ namedProperties: Seq[NamedWindowProperty],
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ inputNode: RelNode,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ rowRelDataType: RelDataType,
+ inputType: RelDataType,
+ grouping: Array[Int])
+ extends SingleRel(cluster, traitSet, inputNode) with CommonAggregate with DataStreamRel {
+
+ override def deriveRowType(): RelDataType = rowRelDataType
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataStreamGroupWindowAggregate(
+ window,
+ namedProperties,
+ cluster,
+ traitSet,
+ inputs.get(0),
+ namedAggregates,
+ getRowType,
+ inputType,
+ grouping)
+ }
+
+ override def toString: String = {
+ s"Aggregate(${
+ if (!grouping.isEmpty) {
+ s"groupBy: (${groupingToString(inputType, grouping)}), "
+ } else {
+ ""
+ }
+ }window: ($window), " +
+ s"select: (${
+ aggregationToString(
+ inputType,
+ grouping,
+ getRowType,
+ namedAggregates,
+ namedProperties)
+ }))"
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ super.explainTerms(pw)
+ .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty)
+ .item("window", window)
+ .item(
+ "select", aggregationToString(
+ inputType,
+ grouping,
+ getRowType,
+ namedAggregates,
+ namedProperties))
+ }
+
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
+
+ val groupingKeys = grouping.indices.toArray
+ val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+
+ val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+
+ val aggString = aggregationToString(
+ inputType,
+ grouping,
+ getRowType,
+ namedAggregates,
+ namedProperties)
+
+ val keyedAggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " +
+ s"window: ($window), " +
+ s"select: ($aggString)"
+ val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
+
+ // grouped / keyed aggregation
+ if (groupingKeys.length > 0) {
+ val windowFunction = AggregateUtil.createAggregationGroupWindowFunction(
+ window,
+ groupingKeys.length,
+ namedAggregates.size,
+ rowRelDataType.getFieldCount,
+ namedProperties)
+
+ val keyedStream = inputDS.keyBy(groupingKeys: _*)
+ val windowedStream =
+ createKeyedWindowedStream(window, keyedStream)
+ .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
+
+ val (aggFunction, accumulatorRowType, aggResultRowType) =
+ AggregateUtil.createDataStreamGroupWindowAggregateFunction(
+ namedAggregates,
+ inputType,
+ rowRelDataType,
+ grouping)
+
+ windowedStream
+ .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, rowTypeInfo)
+ .name(keyedAggOpName)
+ }
+ // global / non-keyed aggregation
+ else {
+ val windowFunction = AggregateUtil.createAggregationAllWindowFunction(
+ window,
+ rowRelDataType.getFieldCount,
+ namedProperties)
+
+ val windowedStream =
+ createNonKeyedWindowedStream(window, inputDS)
+ .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
+
+ val (aggFunction, accumulatorRowType, aggResultRowType) =
+ AggregateUtil.createDataStreamGroupWindowAggregateFunction(
+ namedAggregates,
+ inputType,
+ rowRelDataType,
+ grouping)
+
+ windowedStream
+ .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, rowTypeInfo)
+ .name(nonKeyedAggOpName)
+ }
+ }
+}
+
+object DataStreamGroupWindowAggregate {
+
+
+ private def createKeyedWindowedStream(groupWindow: LogicalWindow, stream: KeyedStream[Row, Tuple])
+ : WindowedStream[Row, Tuple, _ <: DataStreamWindow] = groupWindow match {
+
+ case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
+ stream.window(TumblingProcessingTimeWindows.of(asTime(size)))
+
+ case ProcessingTimeTumblingGroupWindow(_, size) =>
+ stream.countWindow(asCount(size))
+
+ case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
+ stream.window(TumblingEventTimeWindows.of(asTime(size)))
+
+ case EventTimeTumblingGroupWindow(_, _, size) =>
+ // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
+ // before applying the windowing logic. Otherwise, this would be the same as a
+ // ProcessingTimeTumblingGroupWindow
+ throw new UnsupportedOperationException(
+ "Event-time grouping windows on row intervals are currently not supported.")
+
+ case ProcessingTimeSlidingGroupWindow(_, size, slide) if isTimeInterval(size.resultType) =>
+ stream.window(SlidingProcessingTimeWindows.of(asTime(size), asTime(slide)))
+
+ case ProcessingTimeSlidingGroupWindow(_, size, slide) =>
+ stream.countWindow(asCount(size), asCount(slide))
+
+ case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
+ stream.window(SlidingEventTimeWindows.of(asTime(size), asTime(slide)))
+
+ case EventTimeSlidingGroupWindow(_, _, size, slide) =>
+ // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
+ // before applying the windowing logic. Otherwise, this would be the same as a
+ // ProcessingTimeTumblingGroupWindow
+ throw new UnsupportedOperationException(
+ "Event-time grouping windows on row intervals are currently not supported.")
+
+ case ProcessingTimeSessionGroupWindow(_, gap: Expression) =>
+ stream.window(ProcessingTimeSessionWindows.withGap(asTime(gap)))
+
+ case EventTimeSessionGroupWindow(_, _, gap) =>
+ stream.window(EventTimeSessionWindows.withGap(asTime(gap)))
+ }
+
+ private def createNonKeyedWindowedStream(groupWindow: LogicalWindow, stream: DataStream[Row])
+ : AllWindowedStream[Row, _ <: DataStreamWindow] = groupWindow match {
+
+ case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
+ stream.windowAll(TumblingProcessingTimeWindows.of(asTime(size)))
+
+ case ProcessingTimeTumblingGroupWindow(_, size) =>
+ stream.countWindowAll(asCount(size))
+
+ case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
+ stream.windowAll(TumblingEventTimeWindows.of(asTime(size)))
+
+ case EventTimeTumblingGroupWindow(_, _, size) =>
+ // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
+ // before applying the windowing logic. Otherwise, this would be the same as a
+ // ProcessingTimeTumblingGroupWindow
+ throw new UnsupportedOperationException(
+ "Event-time grouping windows on row intervals are currently not supported.")
+
+ case ProcessingTimeSlidingGroupWindow(_, size, slide) if isTimeInterval(size.resultType) =>
+ stream.windowAll(SlidingProcessingTimeWindows.of(asTime(size), asTime(slide)))
+
+ case ProcessingTimeSlidingGroupWindow(_, size, slide) =>
+ stream.countWindowAll(asCount(size), asCount(slide))
+
+ case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
+ stream.windowAll(SlidingEventTimeWindows.of(asTime(size), asTime(slide)))
+
+ case EventTimeSlidingGroupWindow(_, _, size, slide) =>
+ // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
+ // before applying the windowing logic. Otherwise, this would be the same as a
+ // ProcessingTimeTumblingGroupWindow
+ throw new UnsupportedOperationException(
+ "Event-time grouping windows on row intervals are currently not supported.")
+
+ case ProcessingTimeSessionGroupWindow(_, gap) =>
+ stream.windowAll(ProcessingTimeSessionWindows.withGap(asTime(gap)))
+
+ case EventTimeSessionGroupWindow(_, _, gap) =>
+ stream.windowAll(EventTimeSessionWindows.withGap(asTime(gap)))
+ }
+
+ def asTime(expr: Expression): Time = expr match {
+ case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) => Time.milliseconds(value)
+ case _ => throw new IllegalArgumentException()
+ }
+
+ def asCount(expr: Expression): Long = expr match {
+ case Literal(value: Long, RowIntervalTypeInfo.INTERVAL_ROWS) => value
+ case _ => throw new IllegalArgumentException()
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/ff262508/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 5caaf1f..6805a68 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -171,8 +171,9 @@ object FlinkRuleSets {
UnionEliminatorRule.INSTANCE,
// translate to DataStream nodes
+ DataStreamGroupAggregateRule.INSTANCE,
DataStreamOverAggregateRule.INSTANCE,
- DataStreamAggregateRule.INSTANCE,
+ DataStreamGroupWindowAggregateRule.INSTANCE,
DataStreamCalcRule.INSTANCE,
DataStreamScanRule.INSTANCE,
DataStreamUnionRule.INSTANCE,
http://git-wip-us.apache.org/repos/asf/flink/blob/ff262508/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
deleted file mode 100644
index 09f05d7..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.rules.datastream
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.expressions.Alias
-import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
-import org.apache.flink.table.plan.nodes.datastream.{DataStreamAggregate, DataStreamConvention}
-
-import scala.collection.JavaConversions._
-
-class DataStreamAggregateRule
- extends ConverterRule(
- classOf[LogicalWindowAggregate],
- Convention.NONE,
- DataStreamConvention.INSTANCE,
- "DataStreamAggregateRule") {
-
- override def matches(call: RelOptRuleCall): Boolean = {
- val agg: LogicalWindowAggregate = call.rel(0).asInstanceOf[LogicalWindowAggregate]
-
- // check if we have distinct aggregates
- val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
- if (distinctAggs) {
- throw TableException("DISTINCT aggregates are currently not supported.")
- }
-
- // check if we have grouping sets
- val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
- if (groupSets || agg.indicator) {
- throw TableException("GROUPING SETS are currently not supported.")
- }
-
- !distinctAggs && !groupSets && !agg.indicator
- }
-
- override def convert(rel: RelNode): RelNode = {
- val agg: LogicalWindowAggregate = rel.asInstanceOf[LogicalWindowAggregate]
- val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
- val convInput: RelNode = RelOptRule.convert(agg.getInput, DataStreamConvention.INSTANCE)
-
- new DataStreamAggregate(
- agg.getWindow,
- agg.getNamedProperties,
- rel.getCluster,
- traitSet,
- convInput,
- agg.getNamedAggCalls,
- rel.getRowType,
- agg.getInput.getRowType,
- agg.getGroupSet.toArray)
- }
- }
-
-object DataStreamAggregateRule {
- val INSTANCE: RelOptRule = new DataStreamAggregateRule
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff262508/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupAggregateRule.scala
new file mode 100644
index 0000000..82d7104
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupAggregateRule.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalAggregate
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamGroupAggregate, DataStreamConvention}
+
+import scala.collection.JavaConversions._
+
+/**
+ * Rule to convert a [[LogicalAggregate]] into a [[DataStreamGroupAggregate]].
+ */
+class DataStreamGroupAggregateRule
+ extends ConverterRule(
+ classOf[LogicalAggregate],
+ Convention.NONE,
+ DataStreamConvention.INSTANCE,
+ "DataStreamGroupAggregateRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]
+
+ // check if we have distinct aggregates
+ val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
+ if (distinctAggs) {
+ throw TableException("DISTINCT aggregates are currently not supported.")
+ }
+
+ // check if we have grouping sets
+ val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
+ if (groupSets || agg.indicator) {
+ throw TableException("GROUPING SETS are currently not supported.")
+ }
+
+ !distinctAggs && !groupSets && !agg.indicator
+ }
+
+ override def convert(rel: RelNode): RelNode = {
+ val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+ val convInput: RelNode = RelOptRule.convert(agg.getInput, DataStreamConvention.INSTANCE)
+
+ new DataStreamGroupAggregate(
+ rel.getCluster,
+ traitSet,
+ convInput,
+ agg.getNamedAggCalls,
+ rel.getRowType,
+ agg.getInput.getRowType,
+ agg.getGroupSet.toArray)
+ }
+}
+
+object DataStreamGroupAggregateRule {
+ val INSTANCE: RelOptRule = new DataStreamGroupAggregateRule
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/ff262508/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupWindowAggregateRule.scala
new file mode 100644
index 0000000..7ec1d40
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupWindowAggregateRule.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamConvention, DataStreamGroupWindowAggregate}
+
+import scala.collection.JavaConversions._
+
+class DataStreamGroupWindowAggregateRule
+ extends ConverterRule(
+ classOf[LogicalWindowAggregate],
+ Convention.NONE,
+ DataStreamConvention.INSTANCE,
+ "DataStreamGroupWindowAggregateRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val agg: LogicalWindowAggregate = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+ // check if we have distinct aggregates
+ val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
+ if (distinctAggs) {
+ throw TableException("DISTINCT aggregates are currently not supported.")
+ }
+
+ // check if we have grouping sets
+ val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
+ if (groupSets || agg.indicator) {
+ throw TableException("GROUPING SETS are currently not supported.")
+ }
+
+ !distinctAggs && !groupSets && !agg.indicator
+ }
+
+ override def convert(rel: RelNode): RelNode = {
+ val agg: LogicalWindowAggregate = rel.asInstanceOf[LogicalWindowAggregate]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+ val convInput: RelNode = RelOptRule.convert(agg.getInput, DataStreamConvention.INSTANCE)
+
+ new DataStreamGroupWindowAggregate(
+ agg.getWindow,
+ agg.getNamedProperties,
+ rel.getCluster,
+ traitSet,
+ convInput,
+ agg.getNamedAggCalls,
+ rel.getRowType,
+ agg.getInput.getRowType,
+ agg.getGroupSet.toArray)
+ }
+ }
+
+object DataStreamGroupWindowAggregateRule {
+ val INSTANCE: RelOptRule = new DataStreamGroupWindowAggregateRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff262508/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 09d1a13..634f7c8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -79,8 +79,7 @@ object AggregateUtil {
inputType,
needRetraction = false)
- val aggregationStateType: RowTypeInfo =
- createDataSetAggregateBufferDataType(Array(), aggregates, inputType)
+ val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates)
val forwardMapping = (0 until inputType.getFieldCount).map(x => (x, x)).toArray
val aggMapping = aggregates.indices.map(x => x + inputType.getFieldCount).toArray
@@ -125,7 +124,36 @@ object AggregateUtil {
}
/**
- * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] for
+ * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] for group (without
+ * window) aggregate to evaluate final aggregate value.
+ *
+ * @param namedAggregates List of calls to aggregate functions and their output field names
+ * @param inputType Input row type
+ * @param groupings the position (in the input Row) of the grouping keys
+ * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
+ */
+ private[flink] def createGroupAggregateFunction(
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ groupings: Array[Int]): ProcessFunction[Row, Row] = {
+
+ val (aggFields, aggregates) =
+ transformToAggregateFunctions(
+ namedAggregates.map(_.getKey),
+ inputType,
+ needRetraction = false)
+
+ val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates)
+
+ new GroupAggProcessFunction(
+ aggregates,
+ aggFields,
+ groupings,
+ aggregationStateType)
+ }
+
+ /**
+ * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] for ROWS clause
* bounded OVER window to evaluate final aggregate value.
*
* @param generator code generator instance
@@ -238,7 +266,7 @@ object AggregateUtil {
needRetraction = false)
val mapReturnType: RowTypeInfo =
- createDataSetAggregateBufferDataType(
+ createRowTypeForKeysAndAggregates(
groupings,
aggregates,
inputType,
@@ -321,7 +349,7 @@ object AggregateUtil {
inputType,
needRetraction = false)._2
- val returnType: RowTypeInfo = createDataSetAggregateBufferDataType(
+ val returnType: RowTypeInfo = createRowTypeForKeysAndAggregates(
groupings,
aggregates,
inputType,
@@ -550,7 +578,7 @@ object AggregateUtil {
window match {
case EventTimeSessionGroupWindow(_, _, gap) =>
val combineReturnType: RowTypeInfo =
- createDataSetAggregateBufferDataType(
+ createRowTypeForKeysAndAggregates(
groupings,
aggregates,
inputType,
@@ -600,7 +628,7 @@ object AggregateUtil {
case EventTimeSessionGroupWindow(_, _, gap) =>
val combineReturnType: RowTypeInfo =
- createDataSetAggregateBufferDataType(
+ createRowTypeForKeysAndAggregates(
groupings,
aggregates,
inputType,
@@ -745,7 +773,7 @@ object AggregateUtil {
}
}
- private[flink] def createDataStreamAggregateFunction(
+ private[flink] def createDataStreamGroupWindowAggregateFunction(
namedAggregates: Seq[CalcitePair[AggregateCall, String]],
inputType: RelDataType,
outputType: RelDataType,
@@ -1125,7 +1153,7 @@ object AggregateUtil {
aggTypes
}
- private def createDataSetAggregateBufferDataType(
+ private def createRowTypeForKeysAndAggregates(
groupings: Array[Int],
aggregates: Array[TableAggregateFunction[_]],
inputType: RelDataType,
http://git-wip-us.apache.org/repos/asf/flink/blob/ff262508/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
new file mode 100644
index 0000000..3813aa0
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+/**
+ * Aggregate Function used for the groupby (without window) aggregate
+ *
+ * @param aggregates the list of all
+ * [[org.apache.flink.table.functions.AggregateFunction]] used for
+ * this aggregation
+ * @param aggFields the position (in the input Row) of the input value for each
+ * aggregate
+ * @param groupings the position (in the input Row) of the grouping keys
+ * @param aggregationStateType the row type info of aggregation
+ */
+class GroupAggProcessFunction(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Array[Int]],
+ private val groupings: Array[Int],
+ private val aggregationStateType: RowTypeInfo)
+ extends ProcessFunction[Row, Row] {
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+ private var output: Row = _
+ private var state: ValueState[Row] = _
+
+ override def open(config: Configuration) {
+ output = new Row(groupings.length + aggregates.length)
+ val stateDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("GroupAggregateState", aggregationStateType)
+ state = getRuntimeContext.getState(stateDescriptor)
+ }
+
+ override def processElement(
+ input: Row,
+ ctx: ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+ var i = 0
+
+ var accumulators = state.value()
+
+ if (null == accumulators) {
+ accumulators = new Row(aggregates.length)
+ i = 0
+ while (i < aggregates.length) {
+ accumulators.setField(i, aggregates(i).createAccumulator())
+ i += 1
+ }
+ }
+
+ // Set group keys value to the final output
+ i = 0
+ while (i < groupings.length) {
+ output.setField(i, input.getField(groupings(i)))
+ i += 1
+ }
+
+ // Set aggregate result to the final output
+ i = 0
+ while (i < aggregates.length) {
+ val index = groupings.length + i
+ val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
+ aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0)))
+ output.setField(index, aggregates(i).getValue(accumulator))
+ i += 1
+ }
+ state.update(accumulators)
+
+ out.collect(output)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff262508/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
index 4d0d9aa..ebdf9de 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
@@ -231,7 +231,7 @@ class FieldProjectionTest extends TableTestBase {
val expected =
unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(0),
@@ -259,7 +259,7 @@ class FieldProjectionTest extends TableTestBase {
val expected = unaryNode(
"DataStreamCalc",
unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(0),
http://git-wip-us.apache.org/repos/asf/flink/blob/ff262508/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index 67d13b0..f7bdccf 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -47,6 +47,27 @@ class SqlITCase extends StreamingWithStateTestBase {
(8L, 8, "Hello World"),
(20L, 20, "Hello World"))
+ /** test unbounded groupby (without window) **/
+ @Test
+ def testUnboundedGroupby(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val sqlQuery = "SELECT b, COUNT(a) FROM MyTable GROUP BY b"
+
+ val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", t)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList("1,1", "2,1", "2,2")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
/** test selection **/
@Test
def testSelectExpressionFromTable(): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/ff262508/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 1c1752f..8fa9e6f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -17,8 +17,6 @@
*/
package org.apache.flink.table.api.scala.stream.sql
-import java.sql.Timestamp
-
import org.apache.flink.api.scala._
import org.apache.flink.table.api.TableException
import org.apache.flink.table.api.scala._
@@ -63,7 +61,7 @@ class WindowAggregateTest extends TableTestBase {
val sqlQuery = "SELECT a, AVG(c) OVER (PARTITION BY a ORDER BY procTime()" +
"RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW) AS avgA " +
"FROM MyTable"
- val expected =
+ val expected =
unaryNode(
"DataStreamCalc",
unaryNode(
@@ -73,7 +71,7 @@ class WindowAggregateTest extends TableTestBase {
streamTableNode(0),
term("select", "a", "c", "PROCTIME() AS $2")
),
- term("partitionBy","a"),
+ term("partitionBy", "a"),
term("orderBy", "PROCTIME"),
term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
term("select", "a", "c", "PROCTIME", "COUNT(c) AS w0$o0", "$SUM0(c) AS w0$o1")
@@ -84,6 +82,26 @@ class WindowAggregateTest extends TableTestBase {
streamUtil.verifySql(sqlQuery, expected)
}
+ def testGroupbyWithoutWindow() = {
+ val sql = "SELECT COUNT(a) FROM MyTable GROUP BY b"
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "b", "a")
+ ),
+ term("groupBy", "b"),
+ term("select", "b", "COUNT(a) AS EXPR$0")
+ ),
+ term("select", "EXPR$0")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
@Test
def testNonPartitionedTumbleWindow() = {
val sql = "SELECT COUNT(*) FROM MyTable GROUP BY FLOOR(rowtime() TO HOUR)"
@@ -91,7 +109,7 @@ class WindowAggregateTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(0),
@@ -112,7 +130,7 @@ class WindowAggregateTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(0),
@@ -134,7 +152,7 @@ class WindowAggregateTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(0),
@@ -156,7 +174,7 @@ class WindowAggregateTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(0),
@@ -177,7 +195,7 @@ class WindowAggregateTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(0),
@@ -199,7 +217,7 @@ class WindowAggregateTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(0),
@@ -222,7 +240,7 @@ class WindowAggregateTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(0),
@@ -280,13 +298,6 @@ class WindowAggregateTest extends TableTestBase {
streamUtil.verifySql(sql, expected)
}
- @Test(expected = classOf[TableException])
- def testInvalidWindowExpression() = {
- val sql = "SELECT COUNT(*) FROM MyTable GROUP BY FLOOR(localTimestamp TO HOUR)"
- val expected = ""
- streamUtil.verifySql(sql, expected)
- }
-
@Test
def testUnboundPartitionedProcessingWindowWithRange() = {
val sql = "SELECT " +
http://git-wip-us.apache.org/repos/asf/flink/blob/ff262508/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
deleted file mode 100644
index 3e7b66b..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.table.AggregationsITCase.TimestampWithEqualWatermark
-import org.apache.flink.table.api.scala.stream.utils.StreamITCase
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-/**
- * We only test some aggregations until better testing of constructed DataStream
- * programs is possible.
- */
-class AggregationsITCase extends StreamingMultipleProgramsTestBase {
-
- val data = List(
- (1L, 1, "Hi"),
- (2L, 2, "Hello"),
- (4L, 2, "Hello"),
- (8L, 3, "Hello world"),
- (16L, 3, "Hello world"))
-
- @Test
- def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val stream = env.fromCollection(data)
- val table = stream.toTable(tEnv, 'long, 'int, 'string)
-
- val windowedTable = table
- .window(Slide over 2.rows every 1.rows as 'w)
- .groupBy('w, 'string)
- .select('string, 'int.count, 'int.avg)
-
- val results = windowedTable.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = Seq("Hello world,1,3", "Hello world,2,3", "Hello,1,2", "Hello,2,2", "Hi,1,1")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testEventTimeSessionGroupWindowOverTime(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val stream = env
- .fromCollection(data)
- .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
- val table = stream.toTable(tEnv, 'long, 'int, 'string)
-
- val windowedTable = table
- .window(Session withGap 7.milli on 'rowtime as 'w)
- .groupBy('w, 'string)
- .select('string, 'int.count)
-
- val results = windowedTable.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = Seq("Hello world,1", "Hello world,1", "Hello,2", "Hi,1")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val stream = env.fromCollection(data)
- val table = stream.toTable(tEnv, 'long, 'int, 'string)
-
- val windowedTable = table
- .window(Tumble over 2.rows as 'w)
- .groupBy('w)
- .select('int.count)
-
- val results = windowedTable.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = Seq("2", "2")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testEventTimeTumblingWindow(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val stream = env
- .fromCollection(data)
- .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
- val table = stream.toTable(tEnv, 'long, 'int, 'string)
-
- val windowedTable = table
- .window(Tumble over 5.milli on 'rowtime as 'w)
- .groupBy('w, 'string)
- .select('string, 'int.count, 'int.avg, 'int.min, 'int.max, 'int.sum, 'w.start, 'w.end)
-
- val results = windowedTable.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = Seq(
- "Hello world,1,3,3,3,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01",
- "Hello world,1,3,3,3,3,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02",
- "Hello,2,2,2,2,4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
- "Hi,1,1,1,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-}
-
-object AggregationsITCase {
- class TimestampWithEqualWatermark extends AssignerWithPunctuatedWatermarks[(Long, Int, String)] {
-
- override def checkAndGetNextWatermark(
- lastElement: (Long, Int, String),
- extractedTimestamp: Long)
- : Watermark = {
- new Watermark(extractedTimestamp)
- }
-
- override def extractTimestamp(
- element: (Long, Int, String),
- previousElementTimestamp: Long): Long = {
- element._1
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff262508/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
new file mode 100644
index 0000000..271e90b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.mutable
+
+/**
+ * Tests of groupby (without window) aggregations
+ */
+class GroupAggregationsITCase extends StreamingWithStateTestBase {
+
+ @Test
+ def testNonKeyedGroupAggregate(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ env.setParallelism(1)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+ .select('a.sum, 'b.sum)
+
+ val results = t.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,1", "3,3", "6,5", "10,8", "15,11", "21,14", "28,18", "36,22", "45,26", "55,30", "66,35",
+ "78,40", "91,45", "105,50", "120,55", "136,61", "153,67", "171,73", "190,79", "210,85",
+ "231,91")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testGroupAggregate(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ env.setParallelism(1)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+ .groupBy('b)
+ .select('b, 'a.sum)
+
+ val results = t.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,1", "2,2", "2,5", "3,4", "3,9", "3,15", "4,7", "4,15",
+ "4,24", "4,34", "5,11", "5,23", "5,36", "5,50", "5,65", "6,16", "6,33", "6,51", "6,70",
+ "6,90", "6,111")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testDoubleGroupAggregation(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+ .groupBy('b)
+ .select('a.sum as 'd, 'b)
+ .groupBy('b, 'd)
+ .select('b)
+
+ val results = t.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1",
+ "2", "2",
+ "3", "3", "3",
+ "4", "4", "4", "4",
+ "5", "5", "5", "5", "5",
+ "6", "6", "6", "6", "6", "6")
+
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testGroupAggregateWithExpression(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ env.setParallelism(1)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+ .groupBy('e, 'b % 3)
+ .select('c.min, 'e, 'a.avg, 'd.count)
+
+ val results = t.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "0,1,1,1", "1,2,2,1", "2,1,2,1", "3,2,3,1", "1,2,2,2",
+ "5,3,3,1", "3,2,3,2", "7,1,4,1", "2,1,3,2", "3,2,3,3", "7,1,4,2", "5,3,4,2", "12,3,5,1",
+ "1,2,3,3", "14,2,5,1")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff262508/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsTest.scala
new file mode 100644
index 0000000..1f4a694
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsTest.scala
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala._
+import org.apache.flink.table.utils.TableTestUtil._
+
+class GroupAggregationsTest extends TableTestBase {
+
+ @Test(expected = classOf[ValidationException])
+ def testGroupingOnNonExistentField(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+ val ds = table
+ // must fail. '_foo is not a valid field
+ .groupBy('_foo)
+ .select('a.avg)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testGroupingInvalidSelection(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+ val ds = table
+ .groupBy('a, 'b)
+ // must fail. 'c is not a grouping key or aggregation
+ .select('c)
+ }
+
+ @Test
+ def testGroupbyWithoutWindow() = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+ val resultTable = table
+ .groupBy('b)
+ .select('a.count)
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "b")
+ ),
+ term("groupBy", "b"),
+ term("select", "b", "COUNT(a) AS TMP_0")
+ ),
+ term("select", "TMP_0")
+ )
+ util.verifyTable(resultTable, expected)
+ }
+
+
+ @Test
+ def testGroupAggregateWithConstant1(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+ val resultTable = table
+ .select('a, 4 as 'four, 'b)
+ .groupBy('four, 'a)
+ .select('four, 'b.sum)
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "4 AS four", "b", "a")
+ ),
+ term("groupBy", "four", "a"),
+ term("select", "four", "a", "SUM(b) AS TMP_0")
+ ),
+ term("select", "4 AS four", "TMP_0")
+ )
+ util.verifyTable(resultTable, expected)
+ }
+
+ @Test
+ def testGroupAggregateWithConstant2(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+ val resultTable = table
+ .select('b, 4 as 'four, 'a)
+ .groupBy('b, 'four)
+ .select('four, 'a.sum)
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "4 AS four", "a", "b")
+ ),
+ term("groupBy", "four", "b"),
+ term("select", "four", "b", "SUM(a) AS TMP_0")
+ ),
+ term("select", "4 AS four", "TMP_0")
+ )
+ util.verifyTable(resultTable, expected)
+ }
+
+ @Test
+ def testGroupAggregateWithExpressionInSelect(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+ val resultTable = table
+ .select('a as 'a, 'b % 3 as 'd, 'c as 'c)
+ .groupBy('d)
+ .select('c.min, 'a.avg)
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "c", "a", "MOD(b, 3) AS d")
+ ),
+ term("groupBy", "d"),
+ term("select", "d", "MIN(c) AS TMP_0", "AVG(a) AS TMP_1")
+ ),
+ term("select", "TMP_0", "TMP_1")
+ )
+ util.verifyTable(resultTable, expected)
+ }
+
+ @Test
+ def testGroupAggregateWithFilter(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+ val resultTable = table
+ .groupBy('b)
+ .select('b, 'a.sum)
+ .where('b === 2)
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "b", "a")
+ ),
+ term("groupBy", "b"),
+ term("select", "b", "SUM(a) AS TMP_0")
+ ),
+ term("select", "b", "TMP_0"),
+ term("where", "=(b, 2)")
+ )
+ util.verifyTable(resultTable, expected)
+ }
+
+ @Test
+ def testGroupAggregateWithAverage(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+ val resultTable = table
+ .groupBy('b)
+ .select('b, 'a.cast(BasicTypeInfo.DOUBLE_TYPE_INFO).avg)
+
+ val expected =
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "b", "a", "CAST(a) AS a0")
+ ),
+ term("groupBy", "b"),
+ term("select", "b", "AVG(a0) AS TMP_0")
+ )
+
+ util.verifyTable(resultTable, expected)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff262508/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
new file mode 100644
index 0000000..b8fc49b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.table.GroupWindowAggregationsITCase.TimestampWithEqualWatermark
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+/**
+ * We only test some aggregations until better testing of constructed DataStream
+ * programs is possible.
+ */
+class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
+
+ val data = List(
+ (1L, 1, "Hi"),
+ (2L, 2, "Hello"),
+ (4L, 2, "Hello"),
+ (8L, 3, "Hello world"),
+ (16L, 3, "Hello world"))
+
+ @Test
+ def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env.fromCollection(data)
+ val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+ val windowedTable = table
+ .window(Slide over 2.rows every 1.rows as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'int.count, 'int.avg)
+
+ val results = windowedTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq("Hello world,1,3", "Hello world,2,3", "Hello,1,2", "Hello,2,2", "Hi,1,1")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testEventTimeSessionGroupWindowOverTime(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+ val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+ val windowedTable = table
+ .window(Session withGap 7.milli on 'rowtime as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'int.count)
+
+ val results = windowedTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq("Hello world,1", "Hello world,1", "Hello,2", "Hi,1")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env.fromCollection(data)
+ val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+ val windowedTable = table
+ .window(Tumble over 2.rows as 'w)
+ .groupBy('w)
+ .select('int.count)
+
+ val results = windowedTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq("2", "2")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testEventTimeTumblingWindow(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+ val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+ val windowedTable = table
+ .window(Tumble over 5.milli on 'rowtime as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'int.count, 'int.avg, 'int.min, 'int.max, 'int.sum, 'w.start, 'w.end)
+
+ val results = windowedTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq(
+ "Hello world,1,3,3,3,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01",
+ "Hello world,1,3,3,3,3,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02",
+ "Hello,2,2,2,2,4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+ "Hi,1,1,1,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+}
+
+object GroupWindowAggregationsITCase {
+ class TimestampWithEqualWatermark extends AssignerWithPunctuatedWatermarks[(Long, Int, String)] {
+
+ override def checkAndGetNextWatermark(
+ lastElement: (Long, Int, String),
+ extractedTimestamp: Long)
+ : Watermark = {
+ new Watermark(extractedTimestamp)
+ }
+
+ override def extractTimestamp(
+ element: (Long, Int, String),
+ previousElementTimestamp: Long): Long = {
+ element._1
+ }
+ }
+}