You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/04/29 09:10:39 UTC
flink git commit: [FLINK-2946] Add support for orderBy() to batch
Table API and SQL.
Repository: flink
Updated Branches:
refs/heads/master c81151e4e -> a5ad7d913
[FLINK-2946] Add support for orderBy() to batch Table API and SQL.
This closes #1926.
Closed because CsvTableSource has been implemented.
This closes #939.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a5ad7d91
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a5ad7d91
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a5ad7d91
Branch: refs/heads/master
Commit: a5ad7d913c4626c355ecc0a8d25f5b9a8c777a85
Parents: c81151e
Author: dawidwys <wy...@gmail.com>
Authored: Fri Apr 22 21:31:28 2016 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Apr 28 23:59:28 2016 +0200
----------------------------------------------------------------------
.../flink/api/scala/table/expressionDsl.scala | 3 +
.../table/expressions/ExpressionParser.scala | 12 +-
.../flink/api/table/expressions/ordering.scala | 43 ++++++
.../table/plan/nodes/dataset/BatchScan.scala | 27 +---
.../plan/nodes/dataset/DataSetAggregate.scala | 35 +----
.../table/plan/nodes/dataset/DataSetRel.scala | 39 ++++-
.../table/plan/nodes/dataset/DataSetSort.scala | 127 ++++++++++++++++
.../api/table/plan/rules/FlinkRuleSets.scala | 1 +
.../plan/rules/dataSet/DataSetSortRule.scala | 61 ++++++++
.../org/apache/flink/api/table/table.scala | 51 ++++++-
.../flink/api/scala/table/test/SortITCase.scala | 147 +++++++++++++++++++
.../apache/flink/test/util/TestBaseUtils.java | 21 ++-
12 files changed, 505 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a5ad7d91/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
index c6f14f3..c4bc1be 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
@@ -65,6 +65,9 @@ trait ImplicitExpressionOperations {
def as(name: Symbol) = Naming(expr, name.name)
+ def asc = Asc(expr)
+ def desc = Desc(expr)
+
/**
* Conditional operator that decides which of two other expressions should be evaluated
* based on a evaluated boolean condition.
http://git-wip-us.apache.org/repos/asf/flink/blob/a5ad7d91/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
index a20a8e9..ffadca5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
@@ -52,6 +52,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
lazy val CAST: Keyword = Keyword("cast")
lazy val NULL: Keyword = Keyword("Null")
lazy val EVAL: Keyword = Keyword("eval")
+ lazy val ASC: Keyword = Keyword("asc")
+ lazy val DESC: Keyword = Keyword("desc")
def functionIdent: ExpressionParser.Parser[String] =
not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~
@@ -124,6 +126,13 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
lazy val suffixIsNotNull: PackratParser[Expression] =
composite <~ "." ~ IS_NOT_NULL ~ opt("()") ^^ { e => IsNotNull(e) }
+ lazy val suffixAsc : PackratParser[Expression] =
+ (atom <~ ".asc" ^^ { e => Asc(e) }) | (atom <~ ASC ^^ { e => Asc(e) })
+
+ lazy val suffixDesc : PackratParser[Expression] =
+ (atom <~ ".desc" ^^ { e => Desc(e) }) | (atom <~ DESC ^^ { e => Desc(e) })
+
+
lazy val suffixSum: PackratParser[Expression] =
composite <~ "." ~ SUM ~ opt("()") ^^ { e => Sum(e) }
@@ -181,7 +190,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
lazy val suffixed: PackratParser[Expression] =
suffixIsNull | suffixIsNotNull | suffixSum | suffixMin | suffixMax | suffixCount | suffixAvg |
- suffixCast | suffixAs | suffixTrim | suffixTrimWithoutArgs | suffixEval | suffixFunctionCall
+ suffixCast | suffixAs | suffixTrim | suffixTrimWithoutArgs | suffixEval | suffixFunctionCall |
+ suffixAsc | suffixDesc
// prefix operators
http://git-wip-us.apache.org/repos/asf/flink/blob/a5ad7d91/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala
new file mode 100644
index 0000000..75fa078
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+
+abstract class Ordering extends UnaryExpression { self: Product =>
+}
+
+case class Asc(child: Expression) extends Ordering {
+ override def toString: String = s"($child).asc"
+
+ override def name: String = child.name + "-asc"
+
+ override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ child.toRexNode
+ }
+}
+
+case class Desc(child: Expression) extends Ordering {
+ override def toString: String = s"($child).desc"
+
+ override def name: String = child.name + "-desc"
+
+ override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.desc(child.toRexNode)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a5ad7d91/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
index 715109e..b18d674 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
@@ -81,31 +81,14 @@ abstract class BatchScan(
// conversion
if (determinedType != inputType) {
- val generator = new CodeGenerator(
- config,
- input.getType,
- flinkTable.fieldIndexes)
- val conversion = generator.generateConverterResultExpression(
+ val mapFunc = getConversionMapper(
+ config,
+ inputType,
determinedType,
- getRowType.getFieldNames)
-
- val body =
- s"""
- |${conversion.code}
- |return ${conversion.resultTerm};
- |""".stripMargin
-
- val genFunction = generator.generateFunction(
"DataSetSourceConversion",
- classOf[MapFunction[Any, Any]],
- body,
- determinedType)
-
- val mapFunc = new MapRunner[Any, Any](
- genFunction.name,
- genFunction.code,
- genFunction.returnType)
+ getRowType.getFieldNames,
+ Some(flinkTable.fieldIndexes))
val opName = s"from: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"
http://git-wip-us.apache.org/repos/asf/flink/blob/a5ad7d91/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
index 12095a2..114122b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -143,7 +143,12 @@ class DataSetAggregate(
expectedType match {
case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
val mapName = s"convert: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"
- result.map(typeConversion(config, rowTypeInfo, expectedType.get))
+ result.map(getConversionMapper(config,
+ rowTypeInfo.asInstanceOf[TypeInformation[Any]],
+ expectedType.get,
+ "AggregateOutputConversion",
+ rowType.getFieldNames.asScala
+ ))
.name(mapName)
case _ => result
}
@@ -180,32 +185,4 @@ class DataSetAggregate(
}.mkString(", ")
}
- private def typeConversion(
- config: TableConfig,
- rowTypeInfo: RowTypeInfo,
- expectedType: TypeInformation[Any]): MapFunction[Any, Any] = {
-
- val generator = new CodeGenerator(config, rowTypeInfo.asInstanceOf[TypeInformation[Any]])
- val conversion = generator.generateConverterResultExpression(
- expectedType, rowType.getFieldNames.asScala)
-
- val body =
- s"""
- |${conversion.code}
- |return ${conversion.resultTerm};
- |""".stripMargin
-
- val genFunction = generator.generateFunction(
- "AggregateOutputConversion",
- classOf[MapFunction[Any, Any]],
- body,
- expectedType)
-
- new MapRunner[Any, Any](
- genFunction.name,
- genFunction.code,
- genFunction.returnType)
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a5ad7d91/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
index e8f81fd..7c76e46 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
@@ -21,10 +21,13 @@ package org.apache.flink.api.table.plan.nodes.dataset
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.{BatchTableEnvironment, TableEnvironment, TableConfig}
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, TableEnvironment}
import org.apache.flink.api.table.plan.nodes.FlinkRel
+import org.apache.flink.api.table.runtime.MapRunner
import scala.collection.JavaConversions._
@@ -64,4 +67,38 @@ trait DataSetRel extends RelNode with FlinkRel {
}
+ private[dataset] def getConversionMapper(
+ config: TableConfig,
+ inputType: TypeInformation[Any],
+ expectedType: TypeInformation[Any],
+ conversionOperatorName: String,
+ fieldNames: Seq[String],
+ inputPojoFieldMapping: Option[Array[Int]] = None): MapFunction[Any, Any] = {
+
+ val generator = new CodeGenerator(
+ config,
+ inputType,
+ None,
+ inputPojoFieldMapping)
+ val conversion = generator.generateConverterResultExpression(expectedType, fieldNames)
+
+ val body =
+ s"""
+ |${conversion.code}
+ |return ${conversion.resultTerm};
+ |""".stripMargin
+
+ val genFunction = generator.generateFunction(
+ conversionOperatorName,
+ classOf[MapFunction[Any, Any]],
+ body,
+ expectedType)
+
+ new MapRunner[Any, Any](
+ genFunction.name,
+ genFunction.code,
+ genFunction.returnType)
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a5ad7d91/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
new file mode 100644
index 0000000..ef89b06
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.typeutils.PojoTypeInfo
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.typeutils.TypeConverter._
+
+import scala.collection.JavaConverters._
+
+class DataSetSort(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ inp: RelNode,
+ collations: RelCollation,
+ rowType2: RelDataType)
+ extends SingleRel(cluster, traitSet, inp)
+ with DataSetRel{
+
+ override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode ={
+ new DataSetSort(
+ cluster,
+ traitSet,
+ inputs.get(0),
+ collations,
+ rowType2
+ )
+ }
+
+ override def translateToPlan(
+ tableEnv: BatchTableEnvironment,
+ expectedType: Option[TypeInformation[Any]] = None): DataSet[Any] = {
+
+ val config = tableEnv.getConfig
+
+ val inputDS = inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+
+ val currentParallelism = inputDS.getExecutionEnvironment.getParallelism
+ var partitionedDs = if (currentParallelism == 1) {
+ inputDS
+ } else {
+ inputDS.partitionByRange(fieldCollations.map(_._1): _*)
+ .withOrders(fieldCollations.map(_._2): _*)
+ }
+
+ fieldCollations.foreach { fieldCollation =>
+ partitionedDs = partitionedDs.sortPartition(fieldCollation._1, fieldCollation._2)
+ }
+
+ val inputType = partitionedDs.getType
+ expectedType match {
+
+ case None if config.getEfficientTypeUsage =>
+ partitionedDs
+
+ case _ =>
+ val determinedType = determineReturnType(
+ getRowType,
+ expectedType,
+ config.getNullCheck,
+ config.getEfficientTypeUsage)
+
+ // conversion
+ if (determinedType != inputType) {
+
+ val mapFunc = getConversionMapper(config,
+ partitionedDs.getType,
+ determinedType,
+ "DataSetSortConversion",
+ getRowType.getFieldNames.asScala
+ )
+
+ partitionedDs.map(mapFunc)
+ }
+ // no conversion necessary, forward
+ else {
+ partitionedDs
+ }
+ }
+ }
+
+ private def directionToOrder(direction: Direction) = {
+ direction match {
+ case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING
+ case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING
+ }
+
+ }
+
+ private val fieldCollations = collations.getFieldCollations.asScala
+ .map(c => (c.getFieldIndex, directionToOrder(c.getDirection)))
+
+ private val sortFieldsToString = fieldCollations
+ .map(col => s"${rowType2.getFieldNames.get(col._1)} ${col._2.getShortName}" ).mkString(", ")
+
+ override def toString: String = s"Sort(by: $sortFieldsToString)"
+
+ override def explainTerms(pw: RelWriter) : RelWriter = {
+ super.explainTerms(pw)
+ .item("orderBy", sortFieldsToString)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a5ad7d91/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index 06a8a84..5d5912b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -102,6 +102,7 @@ object FlinkRuleSets {
DataSetJoinRule.INSTANCE,
DataSetScanRule.INSTANCE,
DataSetUnionRule.INSTANCE,
+ DataSetSortRule.INSTANCE,
DataSetValuesRule.INSTANCE,
BatchTableSourceScanRule.INSTANCE
)
http://git-wip-us.apache.org/repos/asf/flink/blob/a5ad7d91/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
new file mode 100644
index 0000000..b7f70e3
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.{LogicalJoin, LogicalSort}
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSort}
+
+class DataSetSortRule
+ extends ConverterRule(
+ classOf[LogicalSort],
+ Convention.NONE,
+ DataSetConvention.INSTANCE,
+ "DataSetSortRule") {
+
+ /**
+ * Only translate when no OFFSET or LIMIT specified
+ */
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val sort = call.rel(0).asInstanceOf[LogicalSort]
+ sort.offset == null && sort.fetch == null
+ }
+
+ override def convert(rel: RelNode): RelNode = {
+
+ val sort: LogicalSort = rel.asInstanceOf[LogicalSort]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+ val convInput: RelNode = RelOptRule.convert(sort.getInput, DataSetConvention.INSTANCE)
+
+ new DataSetSort(
+ rel.getCluster,
+ traitSet,
+ convInput,
+ sort.getCollation,
+ rel.getRowType
+ )
+ }
+}
+
+object DataSetSortRule {
+ val INSTANCE: RelOptRule = new DataSetSortRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a5ad7d91/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index 6485139..68e1041 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -21,14 +21,13 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataTypeField
import org.apache.calcite.rel.core.JoinRelType
import org.apache.calcite.rel.logical.LogicalProject
-import org.apache.calcite.rex.{RexInputRef, RexLiteral, RexCall, RexNode}
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexLiteral, RexNode}
import org.apache.calcite.sql.SqlKind
import org.apache.calcite.tools.RelBuilder.{AggCall, GroupKey}
import org.apache.calcite.util.NlsString
import org.apache.flink.api.table.plan.PlanGenException
import org.apache.flink.api.table.plan.RexNodeTranslator.extractAggCalls
-import org.apache.flink.api.table.expressions.{ExpressionParser, Naming,
- UnresolvedFieldReference, Expression}
+import org.apache.flink.api.table.expressions._
import scala.collection.mutable
import scala.collection.JavaConverters._
@@ -209,7 +208,7 @@ class Table(
relBuilder.push(relNode)
relBuilder.filter(predicate.toRexNode(relBuilder))
-
+
new Table(relBuilder.build(), tableEnv)
}
@@ -401,6 +400,50 @@ class Table(
new Table(relBuilder.build(), tableEnv)
}
+ /**
+ * Sorts the given [[Table]]. Similar to SQL ORDER BY.
+ * The resulting Table is sorted globally sorted across all parallel partitions.
+ *
+ * Example:
+ *
+ * {{{
+ * tab.orderBy('name.desc)
+ * }}}
+ */
+ def orderBy(fields: Expression*): Table = {
+ relBuilder.push(relNode)
+
+ if (! fields.forall {
+ case x : UnresolvedFieldReference => true
+ case x : Ordering => x.child.isInstanceOf[UnresolvedFieldReference]
+ case _ => false
+ }) {
+ throw new IllegalArgumentException("All expressions must be field references " +
+ "or asc/desc expressions.")
+ }
+
+ val exprs = fields.map(_.toRexNode(relBuilder))
+
+ relBuilder.sort(exprs.asJava)
+ new Table(relBuilder.build(), tableEnv)
+
+ }
+
+ /**
+ * Sorts the given [[Table]]. Similar to SQL ORDER BY.
+ * The resulting Table is sorted globally sorted across all parallel partitions.
+ *
+ * Example:
+ *
+ * {{{
+ * tab.orderBy("name DESC")
+ * }}}
+ */
+ def orderBy(fields: String): Table = {
+ val parsedFields = ExpressionParser.parseExpressionList(fields)
+ orderBy(parsedFields: _*)
+ }
+
private def createRenamingProject(exprs: Seq[RexNode]): LogicalProject = {
val names = exprs.map{ e =>
http://git-wip-us.apache.org/repos/asf/flink/blob/a5ad7d91/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala
new file mode 100644
index 0000000..94361c6
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class SortITCase(
+ mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsTestBase(mode, configMode) {
+
+ def getExecutionEnvironment = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(4)
+ env
+ }
+
+ val tupleDataSetStrings = List((1, 1L, "Hi")
+ ,(2, 2L, "Hello")
+ ,(3, 2L, "Hello world")
+ ,(4, 3L, "Hello world, how are you?")
+ ,(5, 3L, "I am fine.")
+ ,(6, 3L, "Luke Skywalker")
+ ,(7, 4L, "Comment#1")
+ ,(8, 4L, "Comment#2")
+ ,(9, 4L, "Comment#3")
+ ,(10, 4L, "Comment#4")
+ ,(11, 5L, "Comment#5")
+ ,(12, 5L, "Comment#6")
+ ,(13, 5L, "Comment#7")
+ ,(14, 5L, "Comment#8")
+ ,(15, 5L, "Comment#9")
+ ,(16, 6L, "Comment#10")
+ ,(17, 6L, "Comment#11")
+ ,(18, 6L, "Comment#12")
+ ,(19, 6L, "Comment#13")
+ ,(20, 6L, "Comment#14")
+ ,(21, 6L, "Comment#15"))
+
+ @Test
+ def testOrderByDesc(): Unit = {
+ val env = getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ val t = ds.toTable(tEnv).orderBy('_1.desc)
+ implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+ - x.productElement(0).asInstanceOf[Int])
+
+ val expected = sortExpectedly(tupleDataSetStrings)
+ val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+
+ val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+
+ TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+ }
+
+ @Test
+ def testOrderByAsc(): Unit = {
+ val env = getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ val t = ds.toTable(tEnv).orderBy('_1.asc)
+ implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+ x.productElement(0).asInstanceOf[Int])
+
+ val expected = sortExpectedly(tupleDataSetStrings)
+ val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+
+ val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+
+ TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+ }
+
+ @Test
+ def testOrderByMultipleFieldsDifferentDirections(): Unit = {
+ val env = getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ val t = ds.toTable(tEnv).orderBy('_1.asc, '_2.desc)
+ implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+ (x.productElement(0).asInstanceOf[Int], - x.productElement(1).asInstanceOf[Long]))
+
+ val expected = sortExpectedly(tupleDataSetStrings)
+ val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+
+ val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+
+ TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+ }
+
+ @Test
+ def testOrderByMultipleFieldsWithSql(): Unit = {
+ val env = getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC, _2 DESC"
+ implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+ (- x.productElement(0).asInstanceOf[Int], - x.productElement(1).asInstanceOf[Long]))
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds)
+
+ val expected = sortExpectedly(tupleDataSetStrings)
+ val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+
+ val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+
+ TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+ }
+
+ def sortExpectedly(dataSet: List[Product])(implicit ordering: Ordering[Product]): String = {
+ dataSet.sorted(ordering).mkString("\n").replaceAll("[\\(\\)]", "")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a5ad7d91/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index a5112ec..4dda4cf 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -430,14 +430,23 @@ public class TestBaseUtils extends TestLogger {
// --------------------------------------------------------------------------------------------
public static <T> void compareResultAsTuples(List<T> result, String expected) {
- compareResult(result, expected, true);
+ compareResult(result, expected, true, true);
}
public static <T> void compareResultAsText(List<T> result, String expected) {
- compareResult(result, expected, false);
+ compareResult(result, expected,
+ false, true);
+ }
+
+ public static <T> void compareOrderedResultAsText(List<T> result, String expected) {
+ compareResult(result, expected, false, false);
+ }
+
+ public static <T> void compareOrderedResultAsText(List<T> result, String expected, boolean asTuples) {
+ compareResult(result, expected, asTuples, false);
}
- private static <T> void compareResult(List<T> result, String expected, boolean asTuples) {
+ private static <T> void compareResult(List<T> result, String expected, boolean asTuples, boolean sort) {
String[] expectedStrings = expected.split("\n");
String[] resultStrings = new String[result.size()];
@@ -466,8 +475,10 @@ public class TestBaseUtils extends TestLogger {
assertEquals("Wrong number of elements result", expectedStrings.length, resultStrings.length);
- Arrays.sort(expectedStrings);
- Arrays.sort(resultStrings);
+ if (sort) {
+ Arrays.sort(expectedStrings);
+ Arrays.sort(resultStrings);
+ }
for (int i = 0; i < expectedStrings.length; i++) {
assertEquals(expectedStrings[i], resultStrings[i]);