You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/04/29 09:10:39 UTC

flink git commit: [FLINK-2946] Add support for orderBy() to batch Table API and SQL.

Repository: flink
Updated Branches:
  refs/heads/master c81151e4e -> a5ad7d913


[FLINK-2946] Add support for orderBy() to batch Table API and SQL.

This closes #1926.

Closed because CsvTableSource has been implemented.
This closes #939.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a5ad7d91
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a5ad7d91
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a5ad7d91

Branch: refs/heads/master
Commit: a5ad7d913c4626c355ecc0a8d25f5b9a8c777a85
Parents: c81151e
Author: dawidwys <wy...@gmail.com>
Authored: Fri Apr 22 21:31:28 2016 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Apr 28 23:59:28 2016 +0200

----------------------------------------------------------------------
 .../flink/api/scala/table/expressionDsl.scala   |   3 +
 .../table/expressions/ExpressionParser.scala    |  12 +-
 .../flink/api/table/expressions/ordering.scala  |  43 ++++++
 .../table/plan/nodes/dataset/BatchScan.scala    |  27 +---
 .../plan/nodes/dataset/DataSetAggregate.scala   |  35 +----
 .../table/plan/nodes/dataset/DataSetRel.scala   |  39 ++++-
 .../table/plan/nodes/dataset/DataSetSort.scala  | 127 ++++++++++++++++
 .../api/table/plan/rules/FlinkRuleSets.scala    |   1 +
 .../plan/rules/dataSet/DataSetSortRule.scala    |  61 ++++++++
 .../org/apache/flink/api/table/table.scala      |  51 ++++++-
 .../flink/api/scala/table/test/SortITCase.scala | 147 +++++++++++++++++++
 .../apache/flink/test/util/TestBaseUtils.java   |  21 ++-
 12 files changed, 505 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a5ad7d91/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
index c6f14f3..c4bc1be 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
@@ -65,6 +65,9 @@ trait ImplicitExpressionOperations {
 
   def as(name: Symbol) = Naming(expr, name.name)
 
+  def asc = Asc(expr)
+  def desc = Desc(expr)
+
   /**
     * Conditional operator that decides which of two other expressions should be evaluated
     * based on a evaluated boolean condition.

http://git-wip-us.apache.org/repos/asf/flink/blob/a5ad7d91/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
index a20a8e9..ffadca5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
@@ -52,6 +52,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val CAST: Keyword = Keyword("cast")
   lazy val NULL: Keyword = Keyword("Null")
   lazy val EVAL: Keyword = Keyword("eval")
+  lazy val ASC: Keyword = Keyword("asc")
+  lazy val DESC: Keyword = Keyword("desc")
 
   def functionIdent: ExpressionParser.Parser[String] =
     not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~
@@ -124,6 +126,13 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val suffixIsNotNull: PackratParser[Expression] =
     composite <~ "." ~ IS_NOT_NULL ~ opt("()") ^^ { e => IsNotNull(e) }
 
+  lazy val suffixAsc : PackratParser[Expression] =
+    (atom <~ ".asc" ^^ { e => Asc(e) }) | (atom <~ ASC ^^ { e => Asc(e) })
+
+  lazy val suffixDesc : PackratParser[Expression] =
+    (atom <~ ".desc" ^^ { e => Desc(e) }) | (atom <~ DESC ^^ { e => Desc(e) })
+
+
   lazy val suffixSum: PackratParser[Expression] =
     composite <~ "." ~ SUM ~ opt("()") ^^ { e => Sum(e) }
 
@@ -181,7 +190,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
 
   lazy val suffixed: PackratParser[Expression] =
     suffixIsNull | suffixIsNotNull | suffixSum | suffixMin | suffixMax | suffixCount | suffixAvg |
-      suffixCast | suffixAs | suffixTrim | suffixTrimWithoutArgs | suffixEval | suffixFunctionCall
+      suffixCast | suffixAs | suffixTrim | suffixTrimWithoutArgs | suffixEval | suffixFunctionCall |
+        suffixAsc | suffixDesc
 
   // prefix operators
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a5ad7d91/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala
new file mode 100644
index 0000000..75fa078
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+
+abstract class Ordering extends UnaryExpression { self: Product =>
+}
+
+case class Asc(child: Expression) extends Ordering {
+  override def toString: String = s"($child).asc"
+
+  override def name: String = child.name + "-asc"
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    child.toRexNode
+  }
+}
+
+case class Desc(child: Expression) extends Ordering {
+  override def toString: String = s"($child).desc"
+
+  override def name: String = child.name + "-desc"
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.desc(child.toRexNode)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a5ad7d91/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
index 715109e..b18d674 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
@@ -81,31 +81,14 @@ abstract class BatchScan(
 
         // conversion
         if (determinedType != inputType) {
-          val generator = new CodeGenerator(
-            config,
-            input.getType,
-            flinkTable.fieldIndexes)
 
-          val conversion = generator.generateConverterResultExpression(
+          val mapFunc = getConversionMapper(
+            config,
+            inputType,
             determinedType,
-            getRowType.getFieldNames)
-
-          val body =
-            s"""
-               |${conversion.code}
-               |return ${conversion.resultTerm};
-               |""".stripMargin
-
-          val genFunction = generator.generateFunction(
             "DataSetSourceConversion",
-            classOf[MapFunction[Any, Any]],
-            body,
-            determinedType)
-
-          val mapFunc = new MapRunner[Any, Any](
-            genFunction.name,
-            genFunction.code,
-            genFunction.returnType)
+            getRowType.getFieldNames,
+            Some(flinkTable.fieldIndexes))
 
           val opName = s"from: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a5ad7d91/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
index 12095a2..114122b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -143,7 +143,12 @@ class DataSetAggregate(
     expectedType match {
       case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
         val mapName = s"convert: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"
-        result.map(typeConversion(config, rowTypeInfo, expectedType.get))
+        result.map(getConversionMapper(config,
+          rowTypeInfo.asInstanceOf[TypeInformation[Any]],
+          expectedType.get,
+          "AggregateOutputConversion",
+          rowType.getFieldNames.asScala
+        ))
         .name(mapName)
       case _ => result
     }
@@ -180,32 +185,4 @@ class DataSetAggregate(
     }.mkString(", ")
   }
 
-  private def typeConversion(
-      config: TableConfig,
-      rowTypeInfo: RowTypeInfo,
-      expectedType: TypeInformation[Any]): MapFunction[Any, Any] = {
-
-    val generator = new CodeGenerator(config, rowTypeInfo.asInstanceOf[TypeInformation[Any]])
-    val conversion = generator.generateConverterResultExpression(
-      expectedType, rowType.getFieldNames.asScala)
-
-    val body =
-      s"""
-          |${conversion.code}
-          |return ${conversion.resultTerm};
-          |""".stripMargin
-
-    val genFunction = generator.generateFunction(
-      "AggregateOutputConversion",
-      classOf[MapFunction[Any, Any]],
-      body,
-      expectedType)
-
-    new MapRunner[Any, Any](
-      genFunction.name,
-      genFunction.code,
-      genFunction.returnType)
-
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a5ad7d91/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
index e8f81fd..7c76e46 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
@@ -21,10 +21,13 @@ package org.apache.flink.api.table.plan.nodes.dataset
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.{BatchTableEnvironment, TableEnvironment, TableConfig}
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, TableEnvironment}
 import org.apache.flink.api.table.plan.nodes.FlinkRel
+import org.apache.flink.api.table.runtime.MapRunner
 
 import scala.collection.JavaConversions._
 
@@ -64,4 +67,38 @@ trait DataSetRel extends RelNode with FlinkRel {
 
   }
 
+  private[dataset] def getConversionMapper(
+      config: TableConfig,
+      inputType: TypeInformation[Any],
+      expectedType: TypeInformation[Any],
+      conversionOperatorName: String,
+      fieldNames: Seq[String],
+      inputPojoFieldMapping: Option[Array[Int]] = None): MapFunction[Any, Any] = {
+
+    val generator = new CodeGenerator(
+      config,
+      inputType,
+      None,
+      inputPojoFieldMapping)
+    val conversion = generator.generateConverterResultExpression(expectedType, fieldNames)
+
+    val body =
+      s"""
+         |${conversion.code}
+         |return ${conversion.resultTerm};
+         |""".stripMargin
+
+    val genFunction = generator.generateFunction(
+      conversionOperatorName,
+      classOf[MapFunction[Any, Any]],
+      body,
+      expectedType)
+
+    new MapRunner[Any, Any](
+      genFunction.name,
+      genFunction.code,
+      genFunction.returnType)
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a5ad7d91/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
new file mode 100644
index 0000000..ef89b06
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.typeutils.PojoTypeInfo
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.typeutils.TypeConverter._
+
+import scala.collection.JavaConverters._
+
+class DataSetSort(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inp: RelNode,
+    collations: RelCollation,
+    rowType2: RelDataType)
+  extends SingleRel(cluster, traitSet, inp)
+  with DataSetRel{
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode ={
+    new DataSetSort(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      collations,
+      rowType2
+    )
+  }
+
+  override def translateToPlan(
+              tableEnv: BatchTableEnvironment,
+              expectedType: Option[TypeInformation[Any]] = None): DataSet[Any] = {
+
+    val config = tableEnv.getConfig
+
+    val inputDS = inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+
+    val currentParallelism = inputDS.getExecutionEnvironment.getParallelism
+    var partitionedDs = if (currentParallelism == 1) {
+      inputDS
+    } else {
+      inputDS.partitionByRange(fieldCollations.map(_._1): _*)
+        .withOrders(fieldCollations.map(_._2): _*)
+    }
+
+    fieldCollations.foreach { fieldCollation =>
+      partitionedDs = partitionedDs.sortPartition(fieldCollation._1, fieldCollation._2)
+    }
+
+    val inputType = partitionedDs.getType
+    expectedType match {
+
+      case None if config.getEfficientTypeUsage =>
+        partitionedDs
+
+      case _ =>
+        val determinedType = determineReturnType(
+          getRowType,
+          expectedType,
+          config.getNullCheck,
+          config.getEfficientTypeUsage)
+
+        // conversion
+        if (determinedType != inputType) {
+
+          val mapFunc = getConversionMapper(config,
+            partitionedDs.getType,
+            determinedType,
+            "DataSetSortConversion",
+            getRowType.getFieldNames.asScala
+          )
+
+          partitionedDs.map(mapFunc)
+        }
+        // no conversion necessary, forward
+        else {
+          partitionedDs
+        }
+    }
+  }
+
+  private def directionToOrder(direction: Direction) = {
+    direction match {
+      case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING
+      case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING
+    }
+
+  }
+
+  private val fieldCollations = collations.getFieldCollations.asScala
+    .map(c => (c.getFieldIndex, directionToOrder(c.getDirection)))
+
+  private val sortFieldsToString = fieldCollations
+    .map(col => s"${rowType2.getFieldNames.get(col._1)} ${col._2.getShortName}" ).mkString(", ")
+
+  override def toString: String = s"Sort(by: $sortFieldsToString)"
+
+  override def explainTerms(pw: RelWriter) : RelWriter = {
+    super.explainTerms(pw)
+      .item("orderBy", sortFieldsToString)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a5ad7d91/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index 06a8a84..5d5912b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -102,6 +102,7 @@ object FlinkRuleSets {
     DataSetJoinRule.INSTANCE,
     DataSetScanRule.INSTANCE,
     DataSetUnionRule.INSTANCE,
+    DataSetSortRule.INSTANCE,
     DataSetValuesRule.INSTANCE,
     BatchTableSourceScanRule.INSTANCE
   )

http://git-wip-us.apache.org/repos/asf/flink/blob/a5ad7d91/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
new file mode 100644
index 0000000..b7f70e3
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.{LogicalJoin, LogicalSort}
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSort}
+
+class DataSetSortRule
+  extends ConverterRule(
+    classOf[LogicalSort],
+    Convention.NONE,
+    DataSetConvention.INSTANCE,
+    "DataSetSortRule") {
+
+  /**
+    * Only translate when no OFFSET or LIMIT specified
+    */
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val sort = call.rel(0).asInstanceOf[LogicalSort]
+    sort.offset == null && sort.fetch == null
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+
+    val sort: LogicalSort = rel.asInstanceOf[LogicalSort]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+    val convInput: RelNode = RelOptRule.convert(sort.getInput, DataSetConvention.INSTANCE)
+
+    new DataSetSort(
+      rel.getCluster,
+      traitSet,
+      convInput,
+      sort.getCollation,
+      rel.getRowType
+    )
+  }
+}
+
+object DataSetSortRule {
+  val INSTANCE: RelOptRule = new DataSetSortRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a5ad7d91/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index 6485139..68e1041 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -21,14 +21,13 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataTypeField
 import org.apache.calcite.rel.core.JoinRelType
 import org.apache.calcite.rel.logical.LogicalProject
-import org.apache.calcite.rex.{RexInputRef, RexLiteral, RexCall, RexNode}
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexLiteral, RexNode}
 import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.tools.RelBuilder.{AggCall, GroupKey}
 import org.apache.calcite.util.NlsString
 import org.apache.flink.api.table.plan.PlanGenException
 import org.apache.flink.api.table.plan.RexNodeTranslator.extractAggCalls
-import org.apache.flink.api.table.expressions.{ExpressionParser, Naming,
-          UnresolvedFieldReference, Expression}
+import org.apache.flink.api.table.expressions._
 
 import scala.collection.mutable
 import scala.collection.JavaConverters._
@@ -209,7 +208,7 @@ class Table(
 
     relBuilder.push(relNode)
     relBuilder.filter(predicate.toRexNode(relBuilder))
-    
+
     new Table(relBuilder.build(), tableEnv)
   }
 
@@ -401,6 +400,50 @@ class Table(
     new Table(relBuilder.build(), tableEnv)
   }
 
+  /**
+    * Sorts the given [[Table]]. Similar to SQL ORDER BY.
+    * The resulting Table is sorted globally sorted across all parallel partitions.
+    *
+    * Example:
+    *
+    * {{{
+    *   tab.orderBy('name.desc)
+    * }}}
+    */
+  def orderBy(fields: Expression*): Table = {
+    relBuilder.push(relNode)
+
+    if (! fields.forall {
+      case x : UnresolvedFieldReference => true
+      case x : Ordering => x.child.isInstanceOf[UnresolvedFieldReference]
+      case _ => false
+    }) {
+      throw new IllegalArgumentException("All expressions must be field references " +
+        "or asc/desc expressions.")
+    }
+
+    val exprs = fields.map(_.toRexNode(relBuilder))
+
+    relBuilder.sort(exprs.asJava)
+    new Table(relBuilder.build(), tableEnv)
+
+  }
+
+  /**
+    * Sorts the given [[Table]]. Similar to SQL ORDER BY.
+    * The resulting Table is sorted globally sorted across all parallel partitions.
+    *
+    * Example:
+    *
+    * {{{
+    *   tab.orderBy("name DESC")
+    * }}}
+    */
+  def orderBy(fields: String): Table = {
+    val parsedFields = ExpressionParser.parseExpressionList(fields)
+    orderBy(parsedFields: _*)
+  }
+
   private def createRenamingProject(exprs: Seq[RexNode]): LogicalProject = {
 
     val names = exprs.map{ e =>

http://git-wip-us.apache.org/repos/asf/flink/blob/a5ad7d91/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala
new file mode 100644
index 0000000..94361c6
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class SortITCase(
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  def getExecutionEnvironment = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(4)
+    env
+  }
+
+  val tupleDataSetStrings = List((1, 1L, "Hi")
+    ,(2, 2L, "Hello")
+    ,(3, 2L, "Hello world")
+    ,(4, 3L, "Hello world, how are you?")
+    ,(5, 3L, "I am fine.")
+    ,(6, 3L, "Luke Skywalker")
+    ,(7, 4L, "Comment#1")
+    ,(8, 4L, "Comment#2")
+    ,(9, 4L, "Comment#3")
+    ,(10, 4L, "Comment#4")
+    ,(11, 5L, "Comment#5")
+    ,(12, 5L, "Comment#6")
+    ,(13, 5L, "Comment#7")
+    ,(14, 5L, "Comment#8")
+    ,(15, 5L, "Comment#9")
+    ,(16, 6L, "Comment#10")
+    ,(17, 6L, "Comment#11")
+    ,(18, 6L, "Comment#12")
+    ,(19, 6L, "Comment#13")
+    ,(20, 6L, "Comment#14")
+    ,(21, 6L, "Comment#15"))
+
+  @Test
+  def testOrderByDesc(): Unit = {
+    val env = getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val t = ds.toTable(tEnv).orderBy('_1.desc)
+    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+      - x.productElement(0).asInstanceOf[Int])
+
+    val expected = sortExpectedly(tupleDataSetStrings)
+    val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+
+    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+
+    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+  }
+
+  @Test
+  def testOrderByAsc(): Unit = {
+    val env = getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val t = ds.toTable(tEnv).orderBy('_1.asc)
+    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+      x.productElement(0).asInstanceOf[Int])
+
+    val expected = sortExpectedly(tupleDataSetStrings)
+    val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+
+    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+
+    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+  }
+
+  @Test
+  def testOrderByMultipleFieldsDifferentDirections(): Unit = {
+    val env = getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val t = ds.toTable(tEnv).orderBy('_1.asc, '_2.desc)
+    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+      (x.productElement(0).asInstanceOf[Int], - x.productElement(1).asInstanceOf[Long]))
+
+    val expected = sortExpectedly(tupleDataSetStrings)
+    val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+
+    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+
+    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+  }
+
+  @Test
+  def testOrderByMultipleFieldsWithSql(): Unit = {
+    val env = getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC, _2 DESC"
+    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+      (- x.productElement(0).asInstanceOf[Int], - x.productElement(1).asInstanceOf[Long]))
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    tEnv.registerDataSet("MyTable", ds)
+
+    val expected = sortExpectedly(tupleDataSetStrings)
+    val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+
+    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+
+    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+  }
+
+  def sortExpectedly(dataSet: List[Product])(implicit ordering: Ordering[Product]): String = {
+    dataSet.sorted(ordering).mkString("\n").replaceAll("[\\(\\)]", "")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a5ad7d91/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index a5112ec..4dda4cf 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -430,14 +430,23 @@ public class TestBaseUtils extends TestLogger {
 	// --------------------------------------------------------------------------------------------
 
 	public static <T> void compareResultAsTuples(List<T> result, String expected) {
-		compareResult(result, expected, true);
+		compareResult(result, expected, true, true);
 	}
 
 	public static <T> void compareResultAsText(List<T> result, String expected) {
-		compareResult(result, expected, false);
+		compareResult(result, expected,
+				false, true);
+	}
+
+	public static <T> void compareOrderedResultAsText(List<T> result, String expected) {
+		compareResult(result, expected, false, false);
+	}
+
+	public static <T> void compareOrderedResultAsText(List<T> result, String expected, boolean asTuples) {
+		compareResult(result, expected, asTuples, false);
 	}
 	
-	private static <T> void compareResult(List<T> result, String expected, boolean asTuples) {
+	private static <T> void compareResult(List<T> result, String expected, boolean asTuples, boolean sort) {
 		String[] expectedStrings = expected.split("\n");
 		String[] resultStrings = new String[result.size()];
 		
@@ -466,8 +475,10 @@ public class TestBaseUtils extends TestLogger {
 		
 		assertEquals("Wrong number of elements result", expectedStrings.length, resultStrings.length);
 
-		Arrays.sort(expectedStrings);
-		Arrays.sort(resultStrings);
+		if (sort) {
+			Arrays.sort(expectedStrings);
+			Arrays.sort(resultStrings);
+		}
 		
 		for (int i = 0; i < expectedStrings.length; i++) {
 			assertEquals(expectedStrings[i], resultStrings[i]);