You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/06/30 20:58:26 UTC
flink git commit: [FLINK-6075] [table] Add ORDER BY support for
streaming table.
Repository: flink
Updated Branches:
refs/heads/master d7d10a130 -> b8c8f204d
[FLINK-6075] [table] Add ORDER BY support for streaming table.
This closes #3889.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8c8f204
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8c8f204
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8c8f204
Branch: refs/heads/master
Commit: b8c8f204de718e6d5b7c3df837deafaed7c375f5
Parents: d7d10a1
Author: rtudoran <tu...@ymail.com>
Authored: Fri May 12 20:41:30 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Jun 30 22:55:51 2017 +0200
----------------------------------------------------------------------
docs/dev/table/sql.md | 6 +-
.../calcite/RelTimeIndicatorConverter.scala | 7 +-
.../flink/table/plan/nodes/CommonSort.scala | 114 ++++++++
.../table/plan/nodes/dataset/DataSetSort.scala | 53 +---
.../plan/nodes/datastream/DataStreamSort.scala | 191 +++++++++++++
.../flink/table/plan/rules/FlinkRuleSets.scala | 1 +
.../rules/datastream/DataStreamSortRule.scala | 85 ++++++
.../aggregate/ProcTimeSortProcessFunction.scala | 101 +++++++
.../aggregate/RowTimeSortProcessFunction.scala | 139 ++++++++++
.../table/runtime/aggregate/SortUtil.scala | 193 +++++++++++++
.../api/scala/stream/sql/OverWindowITCase.scala | 18 +-
.../table/api/scala/stream/sql/SortITCase.scala | 122 ++++++++
.../table/api/scala/stream/sql/SortTest.scala | 81 ++++++
.../api/scala/stream/sql/TimeTestUtil.scala | 39 +++
.../aggregate/TimeSortProcessFunctionTest.scala | 276 +++++++++++++++++++
15 files changed, 1367 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index a736477..be586f1 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -486,13 +486,15 @@ FROM (
<tr>
<td>
<strong>Order By</strong><br>
- <span class="label label-primary">Batch</span>
+ <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
</td>
<td>
+<b>Note:</b> The result of streaming queries must be primarily sorted on an ascending <a href="streaming.html#time-attributes">time attribute</a>. Additional sorting attributes are supported.
+
{% highlight sql %}
SELECT *
FROM Orders
-ORDER BY users
+ORDER BY orderTime
{% endhighlight %}
</td>
</tr>
http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index b28e3f8..385cab2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -92,8 +92,11 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
override def visit(minus: LogicalMinus): RelNode =
throw new TableException("Logical minus in a stream environment is not supported yet.")
- override def visit(sort: LogicalSort): RelNode =
- throw new TableException("Logical sort in a stream environment is not supported yet.")
+ override def visit(sort: LogicalSort): RelNode = {
+
+ val input = sort.getInput.accept(this)
+ LogicalSort.create(input, sort.collation, sort.offset, sort.fetch)
+ }
override def visit(`match`: LogicalMatch): RelNode =
throw new TableException("Logical match in a stream environment is not supported yet.")
http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonSort.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonSort.scala
new file mode 100644
index 0000000..2ad4083
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonSort.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.`type`._
+import scala.collection.JavaConverters._
+import org.apache.flink.api.common.operators.Order
+import org.apache.calcite.rel.{RelWriter, RelCollation}
+
+/**
+ * Common methods for Flink sort operators.
+ */
+trait CommonSort {
+
+ private def offsetToString(offset: RexNode): String = {
+ val offsetToString = s"$offset"
+ offsetToString
+ }
+
+ private def sortFieldsToString(
+ collationSort: RelCollation,
+ rowRelDataType: RelDataType): String = {
+ val fieldCollations = collationSort.getFieldCollations.asScala
+ .map(c => (c.getFieldIndex, directionToOrder(c.getDirection)))
+
+ fieldCollations
+ .map(col => s"${rowRelDataType.getFieldNames.get(col._1)} ${col._2.getShortName}" )
+ .mkString(", ")
+ }
+
+ private[flink] def directionToOrder(direction: Direction) = {
+ direction match {
+ case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING
+ case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING
+ case _ => throw new IllegalArgumentException("Unsupported direction.")
+ }
+ }
+
+ private def fetchToString(fetch: RexNode, offset: RexNode): String = {
+ val limitEnd = getFetchLimitEnd(fetch, offset)
+
+ if (limitEnd == Long.MaxValue) {
+ "unlimited"
+ } else {
+ s"$limitEnd"
+ }
+ }
+
+ private[flink] def getFetchLimitEnd (fetch: RexNode, offset: RexNode): Long = {
+ if (fetch != null) {
+ RexLiteral.intValue(fetch) + getFetchLimitStart(offset)
+ } else {
+ Long.MaxValue
+ }
+ }
+
+ private[flink] def getFetchLimitStart (offset: RexNode): Long = {
+ if (offset != null) {
+ RexLiteral.intValue(offset)
+ } else {
+ 0L
+ }
+ }
+
+ private[flink] def sortToString(
+ rowRelDataType: RelDataType,
+ sortCollation: RelCollation,
+ sortOffset: RexNode,
+ sortFetch: RexNode): String = {
+ s"Sort(by: ($$sortFieldsToString(sortCollation, rowRelDataType))," +
+ (if (sortOffset != null) {
+ " offset: $offsetToString(sortOffset),"
+ } else {
+ ""
+ }) +
+ (if (sortFetch != null) {
+ " fetch: $fetchToString(sortFetch, sortOffset))"
+ } else {
+ ""
+ })
+ }
+
+ private[flink] def sortExplainTerms(
+ pw: RelWriter,
+ rowRelDataType: RelDataType,
+ sortCollation: RelCollation,
+ sortOffset: RexNode,
+ sortFetch: RexNode) : RelWriter = {
+
+ pw
+ .item("orderBy", sortFieldsToString(sortCollation, rowRelDataType))
+ .itemIf("offset", offsetToString(sortOffset), sortOffset != null)
+ .itemIf("fetch", fetchToString(sortFetch, sortOffset), sortFetch != null)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
index 192237a..aa6620d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
import org.apache.flink.table.runtime.{CountPartitionFunction, LimitFilterFunction}
import org.apache.flink.types.Row
+import org.apache.flink.table.plan.nodes.CommonSort
import scala.collection.JavaConverters._
@@ -43,19 +44,12 @@ class DataSetSort(
offset: RexNode,
fetch: RexNode)
extends SingleRel(cluster, traitSet, inp)
+ with CommonSort
with DataSetRel {
- private val limitStart: Long = if (offset != null) {
- RexLiteral.intValue(offset)
- } else {
- 0L
- }
+ private val limitStart: Long = getFetchLimitStart(offset)
- private val limitEnd: Long = if (fetch != null) {
- RexLiteral.intValue(fetch) + limitStart
- } else {
- Long.MaxValue
- }
+ private val limitEnd: Long = getFetchLimitEnd(fetch, offset)
override def deriveRowType(): RelDataType = rowRelDataType
@@ -127,7 +121,7 @@ class DataSetSort(
limitEnd,
broadcastName)
- val limitName = s"offset: $offsetToString, fetch: $fetchToString"
+ val limitName = s"offset: $$offsetToString(offset), fetch: $$fetchToString(fetch, offset))"
partitionedDs
.filter(limitFunction)
@@ -136,36 +130,19 @@ class DataSetSort(
}
}
- private def directionToOrder(direction: Direction) = {
- direction match {
- case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING
- case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING
- case _ => throw new IllegalArgumentException("Unsupported direction.")
- }
-
- }
-
private val fieldCollations = collations.getFieldCollations.asScala
.map(c => (c.getFieldIndex, directionToOrder(c.getDirection)))
-
- private val sortFieldsToString = fieldCollations
- .map(col => s"${getRowType.getFieldNames.get(col._1)} ${col._2.getShortName}" ).mkString(", ")
-
- private val offsetToString = s"$offset"
-
- private val fetchToString = if (limitEnd == Long.MaxValue) {
- "unlimited"
- } else {
- s"$limitEnd"
+
+ override def toString: String = {
+ sortToString(getRowType, collations, offset, fetch)
}
-
- override def toString: String =
- s"Sort(by: ($sortFieldsToString), offset: $offsetToString, fetch: $fetchToString)"
-
+
override def explainTerms(pw: RelWriter) : RelWriter = {
- super.explainTerms(pw)
- .item("orderBy", sortFieldsToString)
- .item("offset", offsetToString)
- .item("fetch", fetchToString)
+ sortExplainTerms(
+ super.explainTerms(pw),
+ getRowType,
+ collations,
+ offset,
+ fetch)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
new file mode 100644
index 0000000..a11e6c1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{ RelNode, RelWriter }
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.RelCollation
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.plan.nodes.CommonSort
+import org.apache.calcite.rel.core.Sort
+
+/**
+ * Flink RelNode which matches along with Sort Rule.
+ *
+ */
+class DataStreamSort(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ inputNode: RelNode,
+ inputSchema: RowSchema,
+ schema: RowSchema,
+ sortCollation: RelCollation,
+ sortOffset: RexNode,
+ sortFetch: RexNode,
+ description: String)
+ extends Sort(cluster, traitSet, inputNode, sortCollation, sortOffset, sortFetch)
+ with CommonSort
+ with DataStreamRel {
+
+ override def deriveRowType(): RelDataType = schema.logicalType
+
+ override def copy(
+ traitSet: RelTraitSet,
+ input: RelNode,
+ newCollation: RelCollation,
+ offset: RexNode,
+ fetch: RexNode): Sort = {
+
+ new DataStreamSort(
+ cluster,
+ traitSet,
+ input,
+ inputSchema,
+ schema,
+ newCollation,
+ offset,
+ fetch,
+ description)
+ }
+
+ override def toString: String = {
+ sortToString(schema.logicalType, sortCollation, sortOffset, sortFetch)
+ }
+
+ override def explainTerms(pw: RelWriter) : RelWriter = {
+ sortExplainTerms(
+ pw.input("input", getInput()),
+ schema.logicalType,
+ sortCollation,
+ sortOffset,
+ sortFetch)
+ }
+
+ override def translateToPlan(
+ tableEnv: StreamTableEnvironment,
+ queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+ val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
+
+ // need to identify time between others order fields. Time needs to be first sort element
+ val timeType = SortUtil.getFirstSortField(sortCollation, schema.logicalType).getType
+
+ // time ordering needs to be ascending
+ if (SortUtil.getFirstSortDirection(sortCollation) != Direction.ASCENDING) {
+ throw new TableException("Primary sort order of a streaming table must be ascending on time.")
+ }
+
+ val execCfg = tableEnv.execEnv.getConfig
+
+ // enable to extend for other types of aggregates that will not be implemented in a window
+ timeType match {
+ case _ if FlinkTypeFactory.isProctimeIndicatorType(timeType) =>
+ (sortOffset, sortFetch) match {
+ case (_: RexNode, _: RexNode) => // offset and fetch needs retraction
+ throw new TableException(
+ "Streaming tables do not support sort with offset and fetch.")
+ case (_, _: RexNode) => // offset needs retraction
+ throw new TableException("Streaming tables do not support sort with fetch.")
+ case (_: RexNode, _) => // fetch needs retraction
+ throw new TableException("Streaming tables do not support sort with offset.")
+ case _ => createSortProcTime(inputDS, execCfg)
+ }
+ case _ if FlinkTypeFactory.isRowtimeIndicatorType(timeType) =>
+ (sortOffset, sortFetch) match {
+ case (_: RexNode, _: RexNode) => // offset and fetch needs retraction
+ throw new TableException(
+ "Streaming tables do not support sort with offset and fetch")
+ case (_, _: RexNode) => // offset needs retraction
+ throw new TableException("Streaming tables do not support sort with fetch")
+ case (_: RexNode, _) => // fetch needs retraction
+ throw new TableException("Streaming tables do not support sort with offset")
+ case _ => createSortRowTime(inputDS, execCfg)
+ }
+ case _ =>
+ throw new TableException(
+ "Primary sort order of a streaming table must be ascending on time.")
+ }
+
+ }
+
+ /**
+ * Create Sort logic based on processing time
+ */
+ def createSortProcTime(
+ inputDS: DataStream[CRow],
+ execCfg: ExecutionConfig): DataStream[CRow] = {
+
+ val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo)
+
+ // if the order has secondary sorting fields in addition to the proctime
+ if (sortCollation.getFieldCollations.size() > 1) {
+
+ val processFunction = SortUtil.createProcTimeSortFunction(
+ sortCollation,
+ inputSchema.logicalType,
+ inputSchema.physicalTypeInfo,
+ execCfg)
+
+ inputDS.keyBy(new NullByteKeySelector[CRow])
+ .process(processFunction).setParallelism(1).setMaxParallelism(1)
+ .returns(returnTypeInfo)
+ .asInstanceOf[DataStream[CRow]]
+ } else {
+ // if the order is done only on proctime we only need to forward the elements
+ inputDS
+ .map(new IdentityCRowMap())
+ .setParallelism(1).setMaxParallelism(1)
+ .returns(returnTypeInfo)
+ .asInstanceOf[DataStream[CRow]]
+ }
+ }
+
+ /**
+ * Create Sort logic based on row time
+ */
+ def createSortRowTime(
+ inputDS: DataStream[CRow],
+ execCfg: ExecutionConfig): DataStream[CRow] = {
+
+ val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo)
+
+ val processFunction = SortUtil.createRowTimeSortFunction(
+ sortCollation,
+ inputSchema.logicalType,
+ inputSchema.physicalTypeInfo,
+ execCfg)
+
+ inputDS.keyBy(new NullByteKeySelector[CRow])
+ .process(processFunction).setParallelism(1).setMaxParallelism(1)
+ .returns(returnTypeInfo)
+ .asInstanceOf[DataStream[CRow]]
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index bb3833a..ebfbeb9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -182,6 +182,7 @@ object FlinkRuleSets {
*/
val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(
// translate to DataStream nodes
+ DataStreamSortRule.INSTANCE,
DataStreamGroupAggregateRule.INSTANCE,
DataStreamOverAggregateRule.INSTANCE,
DataStreamGroupWindowAggregateRule.INSTANCE,
http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala
new file mode 100644
index 0000000..17a643a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.plan.nodes.datastream.DataStreamSort
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSort
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.flink.table.runtime.aggregate.SortUtil
+
+/**
+ * Rule to convert a LogicalSort into a DataStreamSort.
+ */
+class DataStreamSortRule
+ extends ConverterRule(
+ classOf[FlinkLogicalSort],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.DATASTREAM,
+ "DataStreamSortRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val sort: FlinkLogicalSort = call.rel(0).asInstanceOf[FlinkLogicalSort]
+ // Check if first sort attribute is time attribute and order is ascending
+ checkTimeOrder(sort)
+ }
+
+ override def convert(rel: RelNode): RelNode = {
+ val sort: FlinkLogicalSort = rel.asInstanceOf[FlinkLogicalSort]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
+ val convInput: RelNode = RelOptRule.convert(sort.getInput(0), FlinkConventions.DATASTREAM)
+
+ val inputRowType = convInput.asInstanceOf[RelSubset].getOriginal.getRowType
+
+ new DataStreamSort(
+ rel.getCluster,
+ traitSet,
+ convInput,
+ new RowSchema(inputRowType),
+ new RowSchema(rel.getRowType),
+ sort.collation,
+ sort.offset,
+ sort.fetch,
+ description)
+ }
+
+
+ /**
+ * Checks if first sort attribute is time attribute and order is ascending.
+ */
+ def checkTimeOrder(sort: FlinkLogicalSort): Boolean = {
+
+ val sortCollation = sort.collation
+ // get type of first sort field
+ val firstSortType = SortUtil.getFirstSortField(sortCollation, sort.getRowType).getType
+ // get direction of first sort field
+ val firstSortDirection = SortUtil.getFirstSortDirection(sortCollation)
+
+ FlinkTypeFactory.isTimeIndicatorType(firstSortType) && firstSortDirection == Direction.ASCENDING
+ }
+}
+
+object DataStreamSortRule {
+ val INSTANCE: RelOptRule = new DataStreamSortRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala
new file mode 100644
index 0000000..2d0b14b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.util.{Collector, Preconditions}
+
+import java.util.ArrayList
+import java.util.Collections
+
+
+/**
+ * ProcessFunction to sort on processing time and additional attributes.
+ *
+ * @param inputRowType The data type of the input data.
+ * @param rowComparator A comparator to sort rows.
+ */
+class ProcTimeSortProcessFunction(
+ private val inputRowType: CRowTypeInfo,
+ private val rowComparator: CollectionRowComparator)
+ extends ProcessFunction[CRow, CRow] {
+
+ Preconditions.checkNotNull(rowComparator)
+
+ private var bufferedEvents: ListState[Row] = _
+ private val sortBuffer: ArrayList[Row] = new ArrayList[Row]
+
+ private var outputC: CRow = _
+
+ override def open(config: Configuration) {
+ val sortDescriptor = new ListStateDescriptor[Row](
+ "sortState",
+ inputRowType.asInstanceOf[CRowTypeInfo].rowType)
+ bufferedEvents = getRuntimeContext.getListState(sortDescriptor)
+
+ outputC = new CRow()
+ }
+
+ override def processElement(
+ inputC: CRow,
+ ctx: ProcessFunction[CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+
+ val input = inputC.row
+ val currentTime = ctx.timerService.currentProcessingTime
+
+ // buffer the event incoming event
+ bufferedEvents.add(input)
+
+ // register a timer for the next millisecond to sort and emit buffered data
+ ctx.timerService.registerProcessingTimeTimer(currentTime + 1)
+
+ }
+
+ override def onTimer(
+ timestamp: Long,
+ ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+ out: Collector[CRow]): Unit = {
+
+ val iter = bufferedEvents.get.iterator()
+
+ // insert all rows into the sort buffer
+ sortBuffer.clear()
+ while(iter.hasNext) {
+ sortBuffer.add(iter.next())
+ }
+ // sort the rows
+ Collections.sort(sortBuffer, rowComparator)
+
+ // Emit the rows in order
+ var i = 0
+ while (i < sortBuffer.size) {
+ outputC.row = sortBuffer.get(i)
+ out.collect(outputC)
+ i += 1
+ }
+
+ // remove all buffered rows
+ bufferedEvents.clear()
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala
new file mode 100644
index 0000000..737f32c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+import java.util.Collections
+import java.util.{List => JList, ArrayList => JArrayList}
+
+/**
+ * ProcessFunction to sort on event-time and possibly addtional secondary sort attributes.
+ *
+ * @param inputRowType The data type of the input data.
+ * @param rowComparator A comparator to sort rows.
+ */
+class RowTimeSortProcessFunction(
+ private val inputRowType: CRowTypeInfo,
+ private val rowComparator: Option[CollectionRowComparator])
+ extends ProcessFunction[CRow, CRow] {
+
+ Preconditions.checkNotNull(rowComparator)
+
+ // State to collect rows between watermarks.
+ private var dataState: MapState[Long, JList[Row]] = _
+
+ // the state keep the last triggering timestamp. Used to filter late events.
+ private var lastTriggeringTsState: ValueState[Long] = _
+
+ private var outputC: CRow = _
+
+ override def open(config: Configuration) {
+
+ val keyTypeInformation: TypeInformation[Long] =
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+ val valueTypeInformation: TypeInformation[JList[Row]] =
+ new ListTypeInfo[Row](inputRowType.asInstanceOf[CRowTypeInfo].rowType)
+
+ val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]](
+ "dataState",
+ keyTypeInformation,
+ valueTypeInformation)
+
+ dataState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+ val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long])
+ lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor)
+
+ outputC = new CRow()
+ }
+
+
+ override def processElement(
+ inputC: CRow,
+ ctx: ProcessFunction[CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+
+ val input = inputC.row
+
+ // timestamp of the processed row
+ val rowtime = ctx.timestamp
+
+ val lastTriggeringTs = lastTriggeringTsState.value
+
+ // check if the row is late and drop it if it is late
+ if (rowtime > lastTriggeringTs) {
+ // get list for timestamp
+ val rows = dataState.get(rowtime)
+ if (null != rows) {
+ rows.add(input)
+ dataState.put(rowtime, rows)
+ } else {
+ val rows = new JArrayList[Row]
+ rows.add(input)
+ dataState.put(rowtime, rows)
+
+ // register event time timer
+ ctx.timerService.registerEventTimeTimer(rowtime)
+ }
+ }
+ }
+
+
+ override def onTimer(
+ timestamp: Long,
+ ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+ out: Collector[CRow]): Unit = {
+
+ // gets all rows for the triggering timestamps
+ val inputs: JList[Row] = dataState.get(timestamp)
+
+ if (null != inputs) {
+
+ // sort rows on secondary fields if necessary
+ if (rowComparator.isDefined) {
+ Collections.sort(inputs, rowComparator.get)
+ }
+
+ // emit rows in order
+ var i = 0
+ while (i < inputs.size) {
+ outputC.row = inputs.get(i)
+ out.collect(outputC)
+ i += 1
+ }
+
+ // remove emitted rows from state
+ dataState.remove(timestamp)
+ lastTriggeringTsState.update(timestamp)
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
new file mode 100644
index 0000000..5f83f1d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.calcite.rel.`type`._
+import org.apache.calcite.rel.RelCollation
+import org.apache.calcite.rel.RelFieldCollation
+import org.apache.calcite.rel.RelFieldCollation.Direction
+
+import org.apache.flink.types.Row
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.api.java.typeutils.runtime.RowComparator
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeinfo.AtomicType
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.table.plan.schema.RowSchema
+
+import java.util.Comparator
+
+import scala.collection.JavaConverters._
+
+/**
+ * Class represents a collection of helper methods to build the sort logic.
+ * It encapsulates as well the implementation for ordering and generic interfaces
+ */
+object SortUtil {
+
+ /**
+ * Creates a ProcessFunction to sort rows based on event time and possibly other secondary fields.
+ *
+ * @param collationSort The list of sort collations.
+ * @param inputType The row type of the input.
+ * @param execCfg Execution configuration to configure comparators.
+ * @return A function to sort stream values based on event-time and secondary sort fields.
+ */
+ private[flink] def createRowTimeSortFunction(
+ collationSort: RelCollation,
+ inputType: RelDataType,
+ inputTypeInfo: TypeInformation[Row],
+ execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = {
+
+ val collectionRowComparator = if (collationSort.getFieldCollations.size() > 1) {
+
+ val rowComp = createRowComparator(
+ inputType,
+ collationSort.getFieldCollations.asScala.tail, // strip off time collation
+ execCfg)
+
+ Some(new CollectionRowComparator(rowComp))
+ } else {
+ None
+ }
+
+ val inputCRowType = CRowTypeInfo(inputTypeInfo)
+
+ new RowTimeSortProcessFunction(
+ inputCRowType,
+ collectionRowComparator)
+
+ }
+
+ /**
+ * Creates a ProcessFunction to sort rows based on processing time and additional fields.
+ *
+ * @param collationSort The list of sort collations.
+ * @param inputType The row type of the input.
+ * @param execCfg Execution configuration to configure comparators.
+ * @return A function to sort stream values based on proctime and other secondary sort fields.
+ */
+ private[flink] def createProcTimeSortFunction(
+ collationSort: RelCollation,
+ inputType: RelDataType,
+ inputTypeInfo: TypeInformation[Row],
+ execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = {
+
+ val rowComp = createRowComparator(
+ inputType,
+ collationSort.getFieldCollations.asScala.tail, // strip off time collation
+ execCfg)
+
+ val collectionRowComparator = new CollectionRowComparator(rowComp)
+
+ val inputCRowType = CRowTypeInfo(inputTypeInfo)
+
+ new ProcTimeSortProcessFunction(
+ inputCRowType,
+ collectionRowComparator)
+
+ }
+
+ /**
+ * Creates a RowComparator for the provided field collations and input type.
+ *
+ * @param inputType the row type of the input.
+ * @param fieldCollations the field collations
+ * @param execConfig the execution configuration.
+ *
+ * @return A RowComparator for the provided sort collations and input type.
+ */
+ private def createRowComparator(
+ inputType: RelDataType,
+ fieldCollations: Seq[RelFieldCollation],
+ execConfig: ExecutionConfig): RowComparator = {
+
+ val sortFields = fieldCollations.map(_.getFieldIndex)
+ val sortDirections = fieldCollations.map(_.direction).map {
+ case Direction.ASCENDING => true
+ case Direction.DESCENDING => false
+ case _ => throw new TableException("SQL/Table does not support such sorting")
+ }
+
+ val fieldComps = for ((k, o) <- sortFields.zip(sortDirections)) yield {
+ FlinkTypeFactory.toTypeInfo(inputType.getFieldList.get(k).getType) match {
+ case a: AtomicType[AnyRef] => a.createComparator(o, execConfig)
+ case x: TypeInformation[_] =>
+ throw new TableException(s"Unsupported field type $x to sort on.")
+ }
+ }
+
+ new RowComparator(
+ new RowSchema(inputType).physicalArity,
+ sortFields.toArray,
+ fieldComps.toArray,
+ new Array[TypeSerializer[AnyRef]](0), // not required because we only compare objects.
+ sortDirections.toArray)
+
+ }
+
+ /**
+ * Returns the direction of the first sort field.
+ *
+ * @param collationSort The list of sort collations.
+ * @return The direction of the first sort field.
+ */
+ def getFirstSortDirection(collationSort: RelCollation): Direction = {
+ collationSort.getFieldCollations.get(0).direction
+ }
+
+ /**
+ * Returns the first sort field.
+ *
+ * @param collationSort The list of sort collations.
+ * @param rowType The row type of the input.
+ * @return The first sort field.
+ */
+ def getFirstSortField(collationSort: RelCollation, rowType: RelDataType): RelDataTypeField = {
+ val idx = collationSort.getFieldCollations.get(0).getFieldIndex
+ rowType.getFieldList.get(idx)
+ }
+
+}
+
+/**
+ * Wrapper for Row TypeComparator to a Java Comparator object
+ */
+class CollectionRowComparator(
+ private val rowComp: TypeComparator[Row]) extends Comparator[Row] with Serializable {
+
+ override def compare(arg0:Row, arg1:Row):Int = {
+ rowComp.compare(arg0, arg1)
+ }
+}
+
+
+/**
+ * Identity map for forwarding the fields based on their arriving times
+ */
+private[flink] class IdentityCRowMap extends MapFunction[CRow,CRow] {
+ override def map(value:CRow):CRow ={
+ value
+ }
+ }
http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
index 397e72c..253d54a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.api.scala.stream.sql
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala.stream.sql.OverWindowITCase.EventTimeSourceFunction
+import org.apache.flink.table.api.scala.stream.sql.TimeTestUtil.EventTimeSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
@@ -862,19 +862,3 @@ class OverWindowITCase extends StreamingWithStateTestBase {
}
}
-
-object OverWindowITCase {
-
- class EventTimeSourceFunction[T](
- dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
- override def run(ctx: SourceContext[T]): Unit = {
- dataWithTimestampList.foreach {
- case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
- case Right(w) => ctx.emitWatermark(new Watermark(w))
- }
- }
-
- override def cancel(): Unit = ???
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortITCase.scala
new file mode 100644
index 0000000..2340591
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortITCase.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala.stream.sql.SortITCase.StringRowSelectorSink
+import org.apache.flink.table.api.scala.stream.sql.TimeTestUtil.EventTimeSourceFunction
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.mutable
+
+class SortITCase extends StreamingWithStateTestBase {
+
+ @Test
+ def testEventTimeOrderBy(): Unit = {
+ val data = Seq(
+ Left((1500L, (1L, 15, "Hello"))),
+ Left((1600L, (1L, 16, "Hello"))),
+ Left((1000L, (1L, 1, "Hello"))),
+ Left((2000L, (2L, 2, "Hello"))),
+ Right(1000L),
+ Left((2000L, (2L, 2, "Hello"))),
+ Left((2000L, (2L, 3, "Hello"))),
+ Left((3000L, (3L, 3, "Hello"))),
+ Left((2000L, (3L, 1, "Hello"))),
+ Right(2000L),
+ Left((4000L, (4L, 4, "Hello"))),
+ Right(3000L),
+ Left((5000L, (5L, 5, "Hello"))),
+ Right(5000L),
+ Left((6000L, (6L, 65, "Hello"))),
+ Left((6000L, (6L, 6, "Hello"))),
+ Left((6000L, (6L, 67, "Hello"))),
+ Left((6000L, (6L, -1, "Hello"))),
+ Left((6000L, (6L, 6, "Hello"))),
+ Right(7000L),
+ Left((9000L, (6L, 9, "Hello"))),
+ Left((8500L, (6L, 18, "Hello"))),
+ Left((9000L, (6L, 7, "Hello"))),
+ Right(10000L),
+ Left((10000L, (7L, 7, "Hello World"))),
+ Left((11000L, (7L, 77, "Hello World"))),
+ Left((11000L, (7L, 17, "Hello World"))),
+ Right(12000L),
+ Left((14000L, (7L, 18, "Hello World"))),
+ Right(14000L),
+ Left((15000L, (8L, 8, "Hello World"))),
+ Right(17000L),
+ Left((20000L, (20L, 20, "Hello World"))),
+ Right(19000L))
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t1 = env.addSource(new EventTimeSourceFunction[(Long, Int, String)](data))
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT b FROM T1 ORDER BY rowtime, b ASC "
+
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StringRowSelectorSink(0)).setParallelism(1)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1", "15", "16",
+ "1", "2", "2", "3",
+ "3",
+ "4",
+ "5",
+ "-1", "6", "6", "65", "67",
+ "18", "7", "9",
+ "7", "17", "77",
+ "18",
+ "8",
+ "20")
+ assertEquals(expected, SortITCase.testResults)
+ }
+}
+
+object SortITCase {
+
+ final class StringRowSelectorSink(private val field:Int) extends RichSinkFunction[Row]() {
+ def invoke(value: Row) {
+ testResults.synchronized {
+ testResults += value.getField(field).toString
+ }
+ }
+ }
+
+ var testResults: mutable.MutableList[String] = mutable.MutableList.empty[String]
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortTest.scala
new file mode 100644
index 0000000..1d50fc1
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortTest.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.junit.Test
+
+class SortTest extends TableTestBase {
+ private val streamUtil: StreamTableTestUtil = streamTestUtil()
+ streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c,
+ 'proctime.proctime, 'rowtime.rowtime)
+
+ @Test
+ def testSortProcessingTime() = {
+
+ val sqlQuery = "SELECT a FROM MyTable ORDER BY proctime, c"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode("DataStreamSort",
+ streamTableNode(0),
+ term("orderBy", "proctime ASC", "c ASC")),
+ term("select", "a", "TIME_MATERIALIZATION(proctime) AS proctime", "c"))
+
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testSortRowTime() = {
+
+ val sqlQuery = "SELECT a FROM MyTable ORDER BY rowtime, c"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode("DataStreamSort",
+ streamTableNode(0),
+ term("orderBy", "rowtime ASC, c ASC")),
+ term("select", "a", "TIME_MATERIALIZATION(rowtime) AS rowtime", "c"))
+
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+
+ // test should fail because time order is descending
+ @Test(expected = classOf[TableException])
+ def testSortProcessingTimeDesc() = {
+
+ val sqlQuery = "SELECT a FROM MyTable ORDER BY proctime DESC, c"
+ streamUtil.verifySql(sqlQuery, "")
+ }
+
+
+ // test should fail because time is not the primary order field
+ @Test(expected = classOf[TableException])
+ def testSortProcessingTimeSecondaryField() = {
+
+ val sqlQuery = "SELECT a FROM MyTable ORDER BY c, proctime"
+ streamUtil.verifySql(sqlQuery, "")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/TimeTestUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/TimeTestUtil.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/TimeTestUtil.scala
new file mode 100644
index 0000000..b883870
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/TimeTestUtil.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.watermark.Watermark
+
+object TimeTestUtil {
+
+ class EventTimeSourceFunction[T](
+ dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
+ override def run(ctx: SourceContext[T]): Unit = {
+ dataWithTimestampList.foreach {
+ case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
+ case Right(w) => ctx.emitWatermark(new Watermark(w))
+ }
+ }
+
+ override def cancel(): Unit = ???
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/TimeSortProcessFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/TimeSortProcessFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/TimeSortProcessFunctionTest.scala
new file mode 100644
index 0000000..df4e1aa
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/TimeSortProcessFunctionTest.scala
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.lang.{Integer => JInt, Long => JLong}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil}
+import org.apache.flink.types.Row
+import org.junit.Test
+import org.apache.flink.table.runtime.aggregate.TimeSortProcessFunctionTest._
+import org.apache.flink.api.java.typeutils.runtime.RowComparator
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.streaming.api.TimeCharacteristic
+
+class TimeSortProcessFunctionTest {
+
+
+ @Test
+ def testSortProcTimeHarnessPartitioned(): Unit = {
+
+ val rT = new RowTypeInfo(Array[TypeInformation[_]](
+ INT_TYPE_INFO,
+ LONG_TYPE_INFO,
+ INT_TYPE_INFO,
+ STRING_TYPE_INFO,
+ LONG_TYPE_INFO),
+ Array("a", "b", "c", "d", "e"))
+
+ val indexes = Array(1, 2)
+
+ val fieldComps = Array[TypeComparator[AnyRef]](
+ LONG_TYPE_INFO.createComparator(true, null).asInstanceOf[TypeComparator[AnyRef]],
+ INT_TYPE_INFO.createComparator(false, null).asInstanceOf[TypeComparator[AnyRef]] )
+ val booleanOrders = Array(true, false)
+
+
+ val rowComp = new RowComparator(
+ rT.getTotalFields,
+ indexes,
+ fieldComps,
+ new Array[TypeSerializer[AnyRef]](0), //used only for serialized comparisons
+ booleanOrders)
+
+ val collectionRowComparator = new CollectionRowComparator(rowComp)
+
+ val inputCRowType = CRowTypeInfo(rT)
+
+ val processFunction = new KeyedProcessOperator[Integer,CRow,CRow](
+ new ProcTimeSortProcessFunction(
+ inputCRowType,
+ collectionRowComparator))
+
+ val testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer,CRow,CRow](
+ processFunction,
+ new TupleRowSelector(0),
+ BasicTypeInfo.INT_TYPE_INFO)
+
+ testHarness.open()
+
+ testHarness.setProcessingTime(3)
+
+ // timestamp is ignored in processing time
+ testHarness.processElement(new StreamRecord(new CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong), true), 1001))
+ testHarness.processElement(new StreamRecord(new CRow(
+ Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong), true), 2002))
+ testHarness.processElement(new StreamRecord(new CRow(
+ Row.of(1: JInt, 12L: JLong, 2: JInt, "aaa", 11L: JLong), true), 2003))
+ testHarness.processElement(new StreamRecord(new CRow(
+ Row.of(1: JInt, 12L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2004))
+ testHarness.processElement(new StreamRecord(new CRow(
+ Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2006))
+
+ //move the timestamp to ensure the execution
+ testHarness.setProcessingTime(1005)
+
+ testHarness.processElement(new StreamRecord(new CRow(
+ Row.of(1: JInt, 1L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2007))
+ testHarness.processElement(new StreamRecord(new CRow(
+ Row.of(1: JInt, 3L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2007))
+ testHarness.processElement(new StreamRecord(new CRow(
+ Row.of(1: JInt, 2L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2007))
+
+ testHarness.setProcessingTime(1008)
+
+ val result = testHarness.getOutput
+
+ val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+ // all elements at the same proc timestamp have the same value
+ // elements should be sorted ascending on field 1 and descending on field 2
+ // (10,0) (11,1) (12,2) (12,1) (12,0)
+ // (1,0) (2,0)
+
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong),true), 4))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong),true), 4))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 12L: JLong, 2: JInt, "aaa", 11L: JLong),true), 4))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong),true), 4))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 12L: JLong, 0: JInt, "aaa", 11L: JLong),true), 4))
+
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 1L: JLong, 0: JInt, "aaa", 11L: JLong),true), 1006))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 2L: JLong, 0: JInt, "aaa", 11L: JLong),true), 1006))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 3L: JLong, 0: JInt, "aaa", 11L: JLong),true), 1006))
+
+ TestHarnessUtil.assertOutputEquals("Output was not correctly sorted.", expectedOutput, result)
+
+ testHarness.close()
+ }
+
+ @Test
+ def testSortRowTimeHarnessPartitioned(): Unit = {
+
+ val rT = new RowTypeInfo(Array[TypeInformation[_]](
+ INT_TYPE_INFO,
+ LONG_TYPE_INFO,
+ INT_TYPE_INFO,
+ STRING_TYPE_INFO,
+ LONG_TYPE_INFO),
+ Array("a", "b", "c", "d", "e"))
+
+ val indexes = Array(1, 2)
+
+ val fieldComps = Array[TypeComparator[AnyRef]](
+ LONG_TYPE_INFO.createComparator(true, null).asInstanceOf[TypeComparator[AnyRef]],
+ INT_TYPE_INFO.createComparator(false, null).asInstanceOf[TypeComparator[AnyRef]] )
+ val booleanOrders = Array(true, false)
+
+ val rowComp = new RowComparator(
+ rT.getTotalFields,
+ indexes,
+ fieldComps,
+ new Array[TypeSerializer[AnyRef]](0), //used only for serialized comparisons
+ booleanOrders)
+
+ val collectionRowComparator = new CollectionRowComparator(rowComp)
+
+ val inputCRowType = CRowTypeInfo(rT)
+
+ val processFunction = new KeyedProcessOperator[Integer,CRow,CRow](
+ new RowTimeSortProcessFunction(
+ inputCRowType,
+ Some(collectionRowComparator)))
+
+ val testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, CRow, CRow](
+ processFunction,
+ new TupleRowSelector(0),
+ BasicTypeInfo.INT_TYPE_INFO)
+
+ testHarness.open()
+
+ testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime)
+ testHarness.processWatermark(3)
+
+ // timestamp is ignored in processing time
+ testHarness.processElement(new StreamRecord(new CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong), true), 1001))
+ testHarness.processElement(new StreamRecord(new CRow(
+ Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong), true), 2002))
+ testHarness.processElement(new StreamRecord(new CRow(
+ Row.of(1: JInt, 13L: JLong, 2: JInt, "aaa", 11L: JLong), true), 2002))
+ testHarness.processElement(new StreamRecord(new CRow(
+ Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 11L: JLong), true), 2002))
+ testHarness.processElement(new StreamRecord(new CRow(
+ Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2002))
+ testHarness.processElement(new StreamRecord(new CRow(
+ Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 11L: JLong), true), 2004))
+ testHarness.processElement(new StreamRecord(new CRow(
+ Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2006))
+
+ // move watermark forward
+ testHarness.processWatermark(2007)
+
+ testHarness.processElement(new StreamRecord(new CRow(
+ Row.of(1: JInt, 20L: JLong, 1: JInt, "aaa", 11L: JLong), true), 2008))
+ testHarness.processElement(new StreamRecord(new CRow(
+ Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2002)) // too late
+ testHarness.processElement(new StreamRecord(new CRow(
+ Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 11L: JLong), true), 2019)) // too early
+ testHarness.processElement(new StreamRecord(new CRow(
+ Row.of(1: JInt, 20L: JLong, 2: JInt, "aaa", 11L: JLong), true), 2008))
+ testHarness.processElement(new StreamRecord(new CRow(
+ Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2010))
+ testHarness.processElement(new StreamRecord(new CRow(
+ Row.of(1: JInt, 19L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2008))
+
+ // move watermark forward
+ testHarness.processWatermark(2012)
+
+ val result = testHarness.getOutput
+
+ val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+ // all elements at the same proc timestamp have the same value
+ // elements should be sorted ascending on field 1 and descending on field 2
+ // (10,0) (11,1) (12,2) (12,1) (12,0)
+ expectedOutput.add(new Watermark(3))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong),true), 1001))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 11L: JLong),true), 2002))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong),true), 2002))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 13L: JLong, 2: JInt, "aaa", 11L: JLong),true), 2002))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", 11L: JLong),true), 2002))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 11L: JLong),true), 2004))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong),true), 2006))
+ expectedOutput.add(new Watermark(2007))
+
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 19L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2008))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 20L: JLong, 2: JInt, "aaa", 11L: JLong), true), 2008))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 20L: JLong, 1: JInt, "aaa", 11L: JLong), true), 2008))
+ expectedOutput.add(new StreamRecord(new CRow(
+ Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2010))
+
+ expectedOutput.add(new Watermark(2012))
+
+ TestHarnessUtil.assertOutputEquals("Output was not correctly sorted.", expectedOutput, result)
+
+ testHarness.close()
+
+ }
+}
+
+object TimeSortProcessFunctionTest {
+
+ /**
+ * Simple test class that returns a specified field as the selector function
+ */
+ class TupleRowSelector(private val selectorField: Int) extends KeySelector[CRow, Integer] {
+
+ override def getKey(value: CRow): Integer = {
+ value.row.getField(selectorField).asInstanceOf[Integer]
+ }
+ }
+
+}