You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/08/11 08:28:10 UTC

[1/2] flink git commit: [FLINK-3940] [table] Additional improvements

Repository: flink
Updated Branches:
  refs/heads/master dc5062557 -> bdd7a114d


[FLINK-3940] [table] Additional improvements

- Improve overflow handling (support for more records than Int.MAX)
- SQL LIMIT support
- Bug fixing and improved docs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bdd7a114
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bdd7a114
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bdd7a114

Branch: refs/heads/master
Commit: bdd7a114d9411e2bda51ad296061c5fca742dc8b
Parents: 0472cb9
Author: twalthr <tw...@apache.org>
Authored: Wed Aug 10 23:49:27 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Thu Aug 11 10:27:11 2016 +0200

----------------------------------------------------------------------
 docs/apis/table.md                              | 33 ++++++--
 .../api/table/plan/logical/operators.scala      | 45 +++++-----
 .../table/plan/nodes/dataset/DataSetSort.scala  | 86 ++++++++++++++------
 .../table/runtime/CountPartitionFunction.scala  | 10 ++-
 .../api/table/runtime/LimitFilterFunction.scala | 42 +++++++---
 .../org/apache/flink/api/table/table.scala      | 70 ++++++++--------
 .../flink/api/scala/batch/sql/SortITCase.scala  | 36 +++++++-
 .../api/scala/batch/table/SortITCase.scala      | 33 +++++++-
 .../scala/stream/table/UnsupportedOpsTest.scala |  8 ++
 9 files changed, 254 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/docs/apis/table.md
----------------------------------------------------------------------
diff --git a/docs/apis/table.md b/docs/apis/table.md
index 6793fde..57252d9 100644
--- a/docs/apis/table.md
+++ b/docs/apis/table.md
@@ -668,7 +668,7 @@ Table result = left.minusAll(right);
     <tr>
       <td><strong>Distinct</strong></td>
       <td>
-        <p>Similar to a SQL DISTINCT clause. Returns rows with distinct value combinations.</p>
+        <p>Similar to a SQL DISTINCT clause. Returns records with distinct value combinations.</p>
 {% highlight java %}
 Table in = tableEnv.fromDataSet(ds, "a, b, c");
 Table result = in.distinct();
@@ -679,7 +679,7 @@ Table result = in.distinct();
     <tr>
       <td><strong>Order By</strong></td>
       <td>
-        <p>Similar to a SQL ORDER BY clause. Returns rows globally sorted across all parallel partitions.</p>
+        <p>Similar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions.</p>
 {% highlight java %}
 Table in = tableEnv.fromDataSet(ds, "a, b, c");
 Table result = in.orderBy("a.asc");
@@ -690,15 +690,15 @@ Table result = in.orderBy("a.asc");
     <tr>
       <td><strong>Limit</strong></td>
       <td>
-        <p>Similar to a SQL LIMIT clause. Returns specified number of rows from offset position. It is technically part of the ORDER BY clause.</p>
+        <p>Similar to a SQL LIMIT clause. Limits a sorted result to a specified number of records from an offset position. Limit is technically part of the Order By operator and thus must be preceded by it.</p>
 {% highlight java %}
 Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3);
+Table result = in.orderBy("a.asc").limit(3); // returns unlimited number of records beginning with the 4th record 
 {% endhighlight %}
 or
 {% highlight java %}
 Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3, 5);
+Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning with the 4th record 
 {% endhighlight %}
       </td>
     </tr>
@@ -890,7 +890,7 @@ val result = left.minusAll(right);
     <tr>
       <td><strong>Distinct</strong></td>
       <td>
-        <p>Similar to a SQL DISTINCT clause. Returns rows with distinct value combinations.</p>
+        <p>Similar to a SQL DISTINCT clause. Returns records with distinct value combinations.</p>
 {% highlight scala %}
 val in = ds.toTable(tableEnv, 'a, 'b, 'c);
 val result = in.distinct();
@@ -901,7 +901,7 @@ val result = in.distinct();
     <tr>
       <td><strong>Order By</strong></td>
       <td>
-        <p>Similar to a SQL ORDER BY clause. Returns rows globally sorted across all parallel partitions.</p>
+        <p>Similar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions.</p>
 {% highlight scala %}
 val in = ds.toTable(tableEnv, 'a, 'b, 'c);
 val result = in.orderBy('a.asc);
@@ -909,6 +909,22 @@ val result = in.orderBy('a.asc);
       </td>
     </tr>
 
+    <tr>
+      <td><strong>Limit</strong></td>
+      <td>
+        <p>Similar to a SQL LIMIT clause. Limits a sorted result to a specified number of records from an offset position. Limit is technically part of the Order By operator and thus must be preceded by it.</p>
+{% highlight scala %}
+val in = ds.toTable(tableEnv, 'a, 'b, 'c);
+val result = in.orderBy('a.asc).limit(3); // returns unlimited number of records beginning with the 4th record 
+{% endhighlight %}
+or
+{% highlight scala %}
+val in = ds.toTable(tableEnv, 'a, 'b, 'c);
+val result = in.orderBy('a.asc).limit(3, 5); // returns 5 records beginning with the 4th record 
+{% endhighlight %}
+      </td>
+    </tr>
+
   </tbody>
 </table>
 </div>
@@ -1087,6 +1103,9 @@ query:
       | query INTERSECT query
     }
     [ ORDER BY orderItem [, orderItem ]* ]
+    [ LIMIT { count | ALL } ]
+    [ OFFSET start { ROW | ROWS } ]
+    [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
 
 orderItem:
   expression [ ASC | DESC ]

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
index 0d4cf2c..79c3cb1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
@@ -19,8 +19,8 @@ package org.apache.flink.api.table.plan.logical
 
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.logical.{LogicalSort, LogicalProject}
-import org.apache.calcite.rex.{RexLiteral, RexInputRef, RexNode}
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.rex.{RexInputRef, RexNode}
 import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -40,9 +40,9 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend
     val newProjectList =
       afterResolve.projectList.zipWithIndex.map { case (e, i) =>
         e match {
-          case u @ UnresolvedAlias(child) => child match {
+          case u @ UnresolvedAlias(c) => c match {
             case ne: NamedExpression => ne
-            case e if !e.valid => u
+            case expr if !expr.valid => u
             case c @ Cast(ne: NamedExpression, tp) => Alias(c, s"${ne.name}-$tp")
             case other => Alias(other, s"_c$i")
           }
@@ -62,14 +62,14 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend
         case n: Alias =>
           // explicit name
           if (names.contains(n.name)) {
-            throw new ValidationException(s"Duplicate field name $n.name.")
+            throw ValidationException(s"Duplicate field name $n.name.")
           } else {
             names.add(n.name)
           }
         case r: ResolvedFieldReference =>
           // simple field forwarding
           if (names.contains(r.name)) {
-            throw new ValidationException(s"Duplicate field name $r.name.")
+            throw ValidationException(s"Duplicate field name $r.name.")
           } else {
             names.add(r.name)
           }
@@ -98,10 +98,10 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend
 
 case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) extends UnaryNode {
   override def output: Seq[Attribute] =
-    throw new UnresolvedException("Invalid call to output on AliasNode")
+    throw UnresolvedException("Invalid call to output on AliasNode")
 
   override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder =
-    throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
+    throw UnresolvedException("Invalid call to toRelNode on AliasNode")
 
   override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
     if (aliasList.length > child.output.length) {
@@ -150,7 +150,7 @@ case class Sort(order: Seq[Ordering], child: LogicalNode) extends UnaryNode {
   }
 }
 
-case class Limit(offset: Int, fetch: Int, child: LogicalNode) extends UnaryNode {
+case class Limit(offset: Int, fetch: Int = -1, child: LogicalNode) extends UnaryNode {
   override def output: Seq[Attribute] = child.output
 
   override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
@@ -160,10 +160,13 @@ case class Limit(offset: Int, fetch: Int, child: LogicalNode) extends UnaryNode
 
   override def validate(tableEnv: TableEnvironment): LogicalNode = {
     if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
-      throw new TableException(s"Limit on stream tables is currently not supported.")
+      failValidation(s"Limit on stream tables is currently not supported.")
     }
     if (!child.validate(tableEnv).isInstanceOf[Sort]) {
-      throw new TableException(s"Limit operator must follow behind orderBy clause.")
+      failValidation(s"Limit operator must be preceded by an OrderBy operator.")
+    }
+    if (offset < 0) {
+      failValidation(s"Offset should be greater than or equal to zero.")
     }
     super.validate(tableEnv)
   }
@@ -193,11 +196,9 @@ case class Aggregate(
     child: LogicalNode) extends UnaryNode {
 
   override def output: Seq[Attribute] = {
-    (groupingExpressions ++ aggregateExpressions) map { agg =>
-      agg match {
-        case ne: NamedExpression => ne.toAttribute
-        case e => Alias(e, e.toString).toAttribute
-      }
+    (groupingExpressions ++ aggregateExpressions) map {
+      case ne: NamedExpression => ne.toAttribute
+      case e => Alias(e, e.toString).toAttribute
     }
   }
 
@@ -205,11 +206,9 @@ case class Aggregate(
     child.construct(relBuilder)
     relBuilder.aggregate(
       relBuilder.groupKey(groupingExpressions.map(_.toRexNode(relBuilder)).asJava),
-      aggregateExpressions.map { e =>
-        e match {
-          case Alias(agg: Aggregation, name) => agg.toAggCall(name)(relBuilder)
-          case _ => throw new RuntimeException("This should never happen.")
-        }
+      aggregateExpressions.map {
+        case Alias(agg: Aggregation, name) => agg.toAggCall(name)(relBuilder)
+        case _ => throw new RuntimeException("This should never happen.")
       }.asJava)
   }
 
@@ -403,7 +402,7 @@ case class Join(
         right)
     }
     val resolvedCondition = node.condition.map(_.postOrderTransform(partialFunction))
-    new Join(node.left, node.right, node.joinType, resolvedCondition)
+    Join(node.left, node.right, node.joinType, resolvedCondition)
   }
 
   override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
@@ -429,7 +428,7 @@ case class Join(
       failValidation(s"join relations with ambiguous names: ${ambiguousName.mkString(", ")}")
     }
 
-    resolvedJoin.condition.foreach(testJoinCondition(_))
+    resolvedJoin.condition.foreach(testJoinCondition)
     resolvedJoin
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
index ef3005c..22930e7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
@@ -28,8 +28,8 @@ import org.apache.calcite.rex.{RexLiteral, RexNode}
 import org.apache.flink.api.common.operators.Order
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.BatchTableEnvironment
-import org.apache.flink.api.table.runtime.{LimitFilterFunction, CountPartitionFunction}
+import org.apache.flink.api.table.{BatchTableEnvironment, TableException}
+import org.apache.flink.api.table.runtime.{CountPartitionFunction, LimitFilterFunction}
 import org.apache.flink.api.table.typeutils.TypeConverter._
 
 import scala.collection.JavaConverters._
@@ -40,12 +40,24 @@ class DataSetSort(
     inp: RelNode,
     collations: RelCollation,
     rowType2: RelDataType,
-    offset: RexNode,             
+    offset: RexNode,
     fetch: RexNode)
   extends SingleRel(cluster, traitSet, inp)
-  with DataSetRel{
+  with DataSetRel {
 
-  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode ={
+  private val limitStart: Long = if (offset != null) {
+    RexLiteral.intValue(offset)
+  } else {
+    0L
+  }
+
+  private val limitEnd: Long = if (fetch != null) {
+    RexLiteral.intValue(fetch) + limitStart
+  } else {
+    Long.MaxValue
+  }
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
     new DataSetSort(
       cluster,
       traitSet,
@@ -58,18 +70,24 @@ class DataSetSort(
   }
 
   override def translateToPlan(
-              tableEnv: BatchTableEnvironment,
-              expectedType: Option[TypeInformation[Any]] = None): DataSet[Any] = {
+      tableEnv: BatchTableEnvironment,
+      expectedType: Option[TypeInformation[Any]] = None)
+    : DataSet[Any] = {
+
+    if (fieldCollations.isEmpty) {
+      throw TableException("Limiting the result without sorting is not allowed " +
+        "as it could lead to arbitrary results.")
+    }
 
     val config = tableEnv.getConfig
 
-    val inputDS = inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+    val inputDs = inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
-    val currentParallelism = inputDS.getExecutionEnvironment.getParallelism
+    val currentParallelism = inputDs.getExecutionEnvironment.getParallelism
     var partitionedDs = if (currentParallelism == 1) {
-      inputDS
+      inputDs
     } else {
-      inputDS.partitionByRange(fieldCollations.map(_._1): _*)
+      inputDs.partitionByRange(fieldCollations.map(_._1): _*)
         .withOrders(fieldCollations.map(_._2): _*)
     }
 
@@ -77,28 +95,37 @@ class DataSetSort(
       partitionedDs = partitionedDs.sortPartition(fieldCollation._1, fieldCollation._2)
     }
 
-    val limitedDS = if (offset == null && fetch == null) {
+    val limitedDs = if (offset == null && fetch == null) {
       partitionedDs
     } else {
-      val limitStart = if (offset != null) RexLiteral.intValue(offset) else 0
-      val limitEnd = if (fetch != null) RexLiteral.intValue(fetch) + limitStart else Int.MaxValue
-
       val countFunction = new CountPartitionFunction[Any]
-      val partitionCount = partitionedDs.mapPartition(countFunction)
+
+      val partitionCountName = s"prepare offset/fetch"
+
+      val partitionCount = partitionedDs
+        .mapPartition(countFunction)
+        .name(partitionCountName)
+
+      val broadcastName = "countPartition"
 
       val limitFunction = new LimitFilterFunction[Any](
         limitStart,
         limitEnd,
-        "countPartition")
-      partitionedDs.filter(limitFunction).withBroadcastSet(partitionCount, "countPartition")
-    }
+        broadcastName)
+
+      val limitName = s"offset: $offsetToString, fetch: $fetchToString"
 
+      partitionedDs
+        .filter(limitFunction)
+        .name(limitName)
+        .withBroadcastSet(partitionCount, broadcastName)
+    }
 
     val inputType = partitionedDs.getType
     expectedType match {
 
       case None if config.getEfficientTypeUsage =>
-        limitedDS
+        limitedDs
 
       case _ =>
         val determinedType = determineReturnType(
@@ -119,11 +146,13 @@ class DataSetSort(
             getRowType.getFieldNames.asScala
           )
 
-          limitedDS.map(mapFunc)
+          val opName = s"convert: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"
+
+          limitedDs.map(mapFunc).name(opName)
         }
         // no conversion necessary, forward
         else {
-          limitedDS
+          limitedDs
         }
     }
   }
@@ -143,10 +172,21 @@ class DataSetSort(
   private val sortFieldsToString = fieldCollations
     .map(col => s"${rowType2.getFieldNames.get(col._1)} ${col._2.getShortName}" ).mkString(", ")
 
-  override def toString: String = s"Sort(by: $sortFieldsToString)"
+  private val offsetToString = s"$offset"
+
+  private val fetchToString = if (limitEnd == Long.MaxValue) {
+    "unlimited"
+  } else {
+    s"$limitEnd"
+  }
+
+  override def toString: String =
+    s"Sort(by: ($sortFieldsToString), offset: $offsetToString, fetch: $fetchToString)"
 
   override def explainTerms(pw: RelWriter) : RelWriter = {
     super.explainTerms(pw)
       .item("orderBy", sortFieldsToString)
+      .item("offset", offsetToString)
+      .item("fetch", fetchToString)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
index 79b8623..5896f4c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
@@ -23,14 +23,16 @@ import java.lang.Iterable
 import org.apache.flink.api.common.functions.RichMapPartitionFunction
 import org.apache.flink.util.Collector
 
-class CountPartitionFunction[IN] extends RichMapPartitionFunction[IN, (Int, Int)] {
-  var elementCount = 0
+class CountPartitionFunction[IN] extends RichMapPartitionFunction[IN, (Int, Long)] {
 
-  override def mapPartition(value: Iterable[IN], out: Collector[(Int, Int)]): Unit = {
+  override def mapPartition(value: Iterable[IN], out: Collector[(Int, Long)]): Unit = {
     val partitionIndex = getRuntimeContext.getIndexOfThisSubtask
+    var elementCount = 0L
     val iterator = value.iterator()
     while (iterator.hasNext) {
-      elementCount += 1
+      if (elementCount != Long.MaxValue) { // prevent overflow
+        elementCount += 1L
+      }
       iterator.next()
     }
     out.collect(partitionIndex, elementCount)

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
index 311b616..5ec9035 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
@@ -21,24 +21,44 @@ package org.apache.flink.api.table.runtime
 import org.apache.flink.api.common.functions.RichFilterFunction
 import org.apache.flink.configuration.Configuration
 
-import scala.collection.mutable
 import scala.collection.JavaConverters._
 
-class LimitFilterFunction[T](limitStart: Int,
-                             limitEnd: Int,
-                             broadcast: String) extends RichFilterFunction[T] {
-  var elementCount = 0
-  var countList = mutable.Buffer[Int]()
+
+class LimitFilterFunction[T](
+    limitStart: Long,
+    limitEnd: Long,
+    broadcastName: String)
+  extends RichFilterFunction[T] {
+
+  var partitionIndex: Int = _
+  var elementCount: Long = _
+  var countList: Array[Long] = _
 
   override def open(config: Configuration) {
-    countList = getRuntimeContext.getBroadcastVariable[(Int, Int)](broadcast).asScala
-      .sortWith(_._1 < _._1).map(_._2).scanLeft(0) (_ + _)
+    partitionIndex = getRuntimeContext.getIndexOfThisSubtask
+
+    val countPartitionResult = getRuntimeContext
+      .getBroadcastVariable[(Int, Long)](broadcastName)
+      .asScala
+
+    // sort by partition index, extract number per partition, sum with intermediate results
+    countList = countPartitionResult.sortWith(_._1 < _._1).map(_._2).scanLeft(0L) { case (a, b) =>
+        val sum = a + b
+        if (sum < 0L) { // prevent overflow
+          Long.MaxValue
+        }
+        sum
+    }.toArray
+
+    elementCount = 0
   }
 
   override def filter(value: T): Boolean = {
-    val partitionIndex = getRuntimeContext.getIndexOfThisSubtask
-    elementCount += 1
+    if (elementCount != Long.MaxValue) { // prevent overflow
+      elementCount += 1L
+    }
+    // we filter out records that are not within the limit (Long.MaxValue is unlimited)
     limitStart - countList(partitionIndex) < elementCount &&
-      limitEnd - countList(partitionIndex) >= elementCount
+      (limitEnd == Long.MaxValue || limitEnd - countList(partitionIndex) >= elementCount)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index c9fd78c..bfabd32 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -563,7 +563,7 @@ class Table(
     * Example:
     *
     * {{{
-    *   tab.orderBy("name DESC")
+    *   tab.orderBy("name.desc")
     * }}}
     */
   def orderBy(fields: String): Table = {
@@ -572,45 +572,39 @@ class Table(
   }
 
   /**
-   * LIMIT is called an argument since it is technically part of the ORDER BY clause.
-   * The statement is used to retrieve records from table and limit the number of records 
-   * returned based on a limit value.
-   *
-   * Example:
-   *
-   * {{{
-   *   tab.orderBy('name.desc).limit(3)
-   * }}}
-   * 
-   * @param offset  The number of rows to skip before including them in the result.
-   */
+    * Limits a sorted result from an offset position.
+    * Similar to a SQL LIMIT clause. Limit is technically part of the Order By operator and
+    * thus must be preceded by it.
+    *
+    * Example:
+    *
+    * {{{
+    *   // returns unlimited number of records beginning with the 4th record
+    *   tab.orderBy('name.desc).limit(3)
+    * }}}
+    *
+    * @param offset number of records to skip
+    */
   def limit(offset: Int): Table = {
-    if (offset < 0) {
-      throw new ValidationException("Offset should be greater than or equal to zero.")
-    }
-    new Table(tableEnv, Limit(offset, -1, logicalPlan).validate(tableEnv))
-  }
-
-  /**
-   * LIMIT is called an argument since it is technically part of the ORDER BY clause.
-   * The statement is used to retrieve records from table and limit the number of records 
-   * returned based on a limit value.
-   *
-   * Example:
-   *
-   * {{{
-   *   tab.orderBy('name.desc).limit(3, 5)
-   * }}}
-   *
-   * @param offset The number of rows to skip before including them in the result.
-   * @param fetch The number of records returned.
-   */
+    new Table(tableEnv, Limit(offset = offset, child = logicalPlan).validate(tableEnv))
+  }
+
+  /**
+    * Limits a sorted result to a specified number of records from an offset position.
+    * Similar to a SQL LIMIT clause. Limit is technically part of the Order By operator and
+    * thus must be preceded by it.
+    *
+    * Example:
+    *
+    * {{{
+    *   // returns 5 records beginning with the 4th record
+    *   tab.orderBy('name.desc).limit(3, 5)
+    * }}}
+    *
+    * @param offset number of records to skip
+    * @param fetch number of records to be returned
+    */
   def limit(offset: Int, fetch: Int): Table = {
-    if (offset < 0 || fetch < 1) {
-      throw new ValidationException(
-        "Offset should be greater than or equal to zero and" +
-          " fetch should be greater than or equal to one.")
-    }
     new Table(tableEnv, Limit(offset, fetch, logicalPlan).validate(tableEnv))
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
index 7c18e14..f345984 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
@@ -25,7 +25,7 @@ import org.apache.flink.api.scala.batch.utils.SortTestUtils._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala._
-import org.apache.flink.api.table.{TableException, Row, TableEnvironment}
+import org.apache.flink.api.table.{Row, TableEnvironment, TableException}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
@@ -102,4 +102,38 @@ class SortITCase(
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
 
+  @Test
+  def testOrderByLimit(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 LIMIT 5"
+
+    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+      x.productElement(0).asInstanceOf[Int])
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    tEnv.registerDataSet("MyTable", ds)
+
+    val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
+    val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+
+    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+
+    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testLimitWithoutOrder(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT * FROM MyTable LIMIT 5"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    tEnv.registerDataSet("MyTable", ds)
+
+    tEnv.sql(sqlQuery).toDataSet[Row].collect()
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
index c4a5a74..d4a1d8d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
@@ -18,13 +18,13 @@
 
 package org.apache.flink.api.scala.batch.table
 
+import org.apache.flink.api.scala.batch.utils.SortTestUtils._
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.batch.utils.SortTestUtils._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.{ExecutionEnvironment, _}
-import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
@@ -135,4 +135,33 @@ class SortITCase(
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
 
+  @Test
+  def testOrderByFetch(): Unit = {
+    val env = getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val t = ds.toTable(tEnv).orderBy('_1.asc).limit(0, 5)
+    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+      x.productElement(0).asInstanceOf[Int])
+
+    val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
+    val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+
+    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+
+    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testFetchWithoutOrder(): Unit = {
+    val env = getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val t = ds.toTable(tEnv).limit(0, 5)
+
+    t.toDataSet[Row].collect()
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
index 89b0fdc..8ce1472 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
@@ -109,4 +109,12 @@ class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase {
     val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
     t1.minusAll(t2)
   }
+
+  @Test(expected = classOf[ValidationException])
+  def testLimit(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    t1.limit(0,5)
+  }
 }


[2/2] flink git commit: [FLINK-3940] [table] Add support for ORDER BY OFFSET FETCH

Posted by tw...@apache.org.
[FLINK-3940] [table] Add support for ORDER BY OFFSET FETCH

This closes #2282.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0472cb9b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0472cb9b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0472cb9b

Branch: refs/heads/master
Commit: 0472cb9ba44de7223fffbc0eca0232d558730772
Parents: dc50625
Author: gallenvara <ga...@126.com>
Authored: Fri Jul 22 11:39:46 2016 +0800
Committer: twalthr <tw...@apache.org>
Committed: Thu Aug 11 10:27:11 2016 +0200

----------------------------------------------------------------------
 docs/apis/table.md                              | 17 +++++++-
 .../api/table/plan/logical/operators.scala      | 23 +++++++++-
 .../table/plan/nodes/dataset/DataSetSort.scala  | 33 ++++++++++++---
 .../plan/rules/dataSet/DataSetSortRule.scala    | 25 ++---------
 .../table/runtime/CountPartitionFunction.scala  | 38 +++++++++++++++++
 .../api/table/runtime/LimitFilterFunction.scala | 44 ++++++++++++++++++++
 .../org/apache/flink/api/table/table.scala      | 43 +++++++++++++++++++
 .../flink/api/scala/batch/sql/SortITCase.scala  | 34 +++++++++++----
 .../api/scala/batch/table/SortITCase.scala      | 36 ++++++++++++++++
 .../api/scala/batch/utils/SortTestUtils.scala   | 13 +++++-
 10 files changed, 267 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/docs/apis/table.md
----------------------------------------------------------------------
diff --git a/docs/apis/table.md b/docs/apis/table.md
index cb56656..6793fde 100644
--- a/docs/apis/table.md
+++ b/docs/apis/table.md
@@ -687,6 +687,22 @@ Table result = in.orderBy("a.asc");
       </td>
     </tr>
 
+    <tr>
+      <td><strong>Limit</strong></td>
+      <td>
+        <p>Similar to a SQL LIMIT clause. Returns specified number of rows from offset position. It is technically part of the ORDER BY clause.</p>
+{% highlight java %}
+Table in = tableEnv.fromDataSet(ds, "a, b, c");
+Table result = in.orderBy("a.asc").limit(3);
+{% endhighlight %}
+or
+{% highlight java %}
+Table in = tableEnv.fromDataSet(ds, "a, b, c");
+Table result = in.orderBy("a.asc").limit(3, 5);
+{% endhighlight %}
+      </td>
+    </tr>
+
   </tbody>
 </table>
 
@@ -1009,7 +1025,6 @@ Among others, the following SQL features are not supported, yet:
 - Timestamps are limited to milliseconds precision
 - Distinct aggregates (e.g., `COUNT(DISTINCT name)`)
 - Non-equi joins and Cartesian products
-- Result selection by order position (`ORDER BY OFFSET FETCH`)
 - Grouping sets
 
 *Note: Tables are joined in the order in which they are specified in the `FROM` clause. In some cases the table order must be manually tweaked to resolve Cartesian products.*

http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
index ad8618c..0d4cf2c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
@@ -19,8 +19,8 @@ package org.apache.flink.api.table.plan.logical
 
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.logical.LogicalProject
-import org.apache.calcite.rex.{RexInputRef, RexNode}
+import org.apache.calcite.rel.logical.{LogicalSort, LogicalProject}
+import org.apache.calcite.rex.{RexLiteral, RexInputRef, RexNode}
 import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -150,6 +150,25 @@ case class Sort(order: Seq[Ordering], child: LogicalNode) extends UnaryNode {
   }
 }
 
+case class Limit(offset: Int, fetch: Int, child: LogicalNode) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+
+  override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
+    child.construct(relBuilder)
+    relBuilder.limit(offset, fetch)
+  }
+
+  override def validate(tableEnv: TableEnvironment): LogicalNode = {
+    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
+      throw new TableException(s"Limit on stream tables is currently not supported.")
+    }
+    if (!child.validate(tableEnv).isInstanceOf[Sort]) {
+      throw new TableException(s"Limit operator must follow behind orderBy clause.")
+    }
+    super.validate(tableEnv)
+  }
+}
+
 case class Filter(condition: Expression, child: LogicalNode) extends UnaryNode {
   override def output: Seq[Attribute] = child.output
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
index 1af03d8..ef3005c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
@@ -24,10 +24,12 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelFieldCollation.Direction
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex.{RexLiteral, RexNode}
 import org.apache.flink.api.common.operators.Order
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.runtime.{LimitFilterFunction, CountPartitionFunction}
 import org.apache.flink.api.table.typeutils.TypeConverter._
 
 import scala.collection.JavaConverters._
@@ -37,7 +39,9 @@ class DataSetSort(
     traitSet: RelTraitSet,
     inp: RelNode,
     collations: RelCollation,
-    rowType2: RelDataType)
+    rowType2: RelDataType,
+    offset: RexNode,             
+    fetch: RexNode)
   extends SingleRel(cluster, traitSet, inp)
   with DataSetRel{
 
@@ -47,7 +51,9 @@ class DataSetSort(
       traitSet,
       inputs.get(0),
       collations,
-      rowType2
+      rowType2,
+      offset,
+      fetch
     )
   }
 
@@ -71,11 +77,28 @@ class DataSetSort(
       partitionedDs = partitionedDs.sortPartition(fieldCollation._1, fieldCollation._2)
     }
 
+    val limitedDS = if (offset == null && fetch == null) {
+      partitionedDs
+    } else {
+      val limitStart = if (offset != null) RexLiteral.intValue(offset) else 0
+      val limitEnd = if (fetch != null) RexLiteral.intValue(fetch) + limitStart else Int.MaxValue
+
+      val countFunction = new CountPartitionFunction[Any]
+      val partitionCount = partitionedDs.mapPartition(countFunction)
+
+      val limitFunction = new LimitFilterFunction[Any](
+        limitStart,
+        limitEnd,
+        "countPartition")
+      partitionedDs.filter(limitFunction).withBroadcastSet(partitionCount, "countPartition")
+    }
+
+
     val inputType = partitionedDs.getType
     expectedType match {
 
       case None if config.getEfficientTypeUsage =>
-        partitionedDs
+        limitedDS
 
       case _ =>
         val determinedType = determineReturnType(
@@ -96,11 +119,11 @@ class DataSetSort(
             getRowType.getFieldNames.asScala
           )
 
-          partitionedDs.map(mapFunc)
+          limitedDS.map(mapFunc)
         }
         // no conversion necessary, forward
         else {
-          partitionedDs
+          limitedDS
         }
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
index b26d1de..5c1fb53 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
@@ -18,12 +18,10 @@
 
 package org.apache.flink.api.table.plan.rules.dataSet
 
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.core.JoinRelType
 import org.apache.calcite.rel.logical.{LogicalJoin, LogicalSort}
-import org.apache.flink.api.table.TableException
 import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSort}
 
 class DataSetSortRule
@@ -33,23 +31,6 @@ class DataSetSortRule
     DataSetConvention.INSTANCE,
     "DataSetSortRule") {
 
-  /**
-    * Only translate when no OFFSET or LIMIT specified
-    */
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val sort = call.rel(0).asInstanceOf[LogicalSort]
-
-    if (sort.offset != null) {
-      throw new TableException("ORDER BY OFFSET is currently not supported.")
-    }
-
-    if (sort.fetch != null) {
-      throw new TableException("ORDER BY FETCH is currently not supported.")
-    }
-
-    sort.offset == null && sort.fetch == null
-  }
-
   override def convert(rel: RelNode): RelNode = {
 
     val sort: LogicalSort = rel.asInstanceOf[LogicalSort]
@@ -61,7 +42,9 @@ class DataSetSortRule
       traitSet,
       convInput,
       sort.getCollation,
-      rel.getRowType
+      rel.getRowType,
+      sort.offset,
+      sort.fetch
     )
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
new file mode 100644
index 0000000..79b8623
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichMapPartitionFunction
+import org.apache.flink.util.Collector
+
+class CountPartitionFunction[IN] extends RichMapPartitionFunction[IN, (Int, Int)] {
+  var elementCount = 0
+
+  override def mapPartition(value: Iterable[IN], out: Collector[(Int, Int)]): Unit = {
+    val partitionIndex = getRuntimeContext.getIndexOfThisSubtask
+    val iterator = value.iterator()
+    while (iterator.hasNext) {
+      elementCount += 1
+      iterator.next()
+    }
+    out.collect(partitionIndex, elementCount)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
new file mode 100644
index 0000000..311b616
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.configuration.Configuration
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+class LimitFilterFunction[T](limitStart: Int,
+                             limitEnd: Int,
+                             broadcast: String) extends RichFilterFunction[T] {
+  var elementCount = 0
+  var countList = mutable.Buffer[Int]()
+
+  override def open(config: Configuration) {
+    countList = getRuntimeContext.getBroadcastVariable[(Int, Int)](broadcast).asScala
+      .sortWith(_._1 < _._1).map(_._2).scanLeft(0) (_ + _)
+  }
+
+  override def filter(value: T): Boolean = {
+    val partitionIndex = getRuntimeContext.getIndexOfThisSubtask
+    elementCount += 1
+    limitStart - countList(partitionIndex) < elementCount &&
+      limitEnd - countList(partitionIndex) >= elementCount
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index cbb9a07..c9fd78c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -572,6 +572,49 @@ class Table(
   }
 
   /**
+   * LIMIT is called an argument since it is technically part of the ORDER BY clause.
+   * The statement is used to retrieve records from table and limit the number of records 
+   * returned based on a limit value.
+   *
+   * Example:
+   *
+   * {{{
+   *   tab.orderBy('name.desc).limit(3)
+   * }}}
+   * 
+   * @param offset  The number of rows to skip before including them in the result.
+   */
+  def limit(offset: Int): Table = {
+    if (offset < 0) {
+      throw new ValidationException("Offset should be greater than or equal to zero.")
+    }
+    new Table(tableEnv, Limit(offset, -1, logicalPlan).validate(tableEnv))
+  }
+
+  /**
+   * LIMIT is called an argument since it is technically part of the ORDER BY clause.
+   * The statement is used to retrieve records from table and limit the number of records 
+   * returned based on a limit value.
+   *
+   * Example:
+   *
+   * {{{
+   *   tab.orderBy('name.desc).limit(3, 5)
+   * }}}
+   *
+   * @param offset The number of rows to skip before including them in the result.
+   * @param fetch The number of records returned.
+   */
+  def limit(offset: Int, fetch: Int): Table = {
+    if (offset < 0 || fetch < 1) {
+      throw new ValidationException(
+        "Offset should be greater than or equal to zero and" +
+          " fetch should be greater than or equal to one.")
+    }
+    new Table(tableEnv, Limit(offset, fetch, logicalPlan).validate(tableEnv))
+  }
+
+  /**
     * Writes the [[Table]] to a [[TableSink]]. A [[TableSink]] defines an external storage location.
     *
     * A batch [[Table]] can only be written to a

http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
index 858f75a..7c18e14 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
@@ -60,28 +60,46 @@ class SortITCase(
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
 
-  @Test(expected = classOf[TableException])
-  def testOrderByOffset(): Unit = {
+  @Test
+  def testOrderByWithOffset(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
-    val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 OFFSET 2 ROWS"
+    val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC OFFSET 2 ROWS"
+
+    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+      -x.productElement(0).asInstanceOf[Int])
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
     tEnv.registerDataSet("MyTable", ds)
-    tEnv.sql(sqlQuery).toDataSet[Row]
+
+    val expected = sortExpectedly(tupleDataSetStrings, 2, 21)
+    val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+
+    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+
+    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
 
-  @Test(expected = classOf[TableException])
-  def testOrderByFirst(): Unit = {
+  @Test
+  def testOrderByWithOffsetAndFetch(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
-    val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 FETCH NEXT 2 ROWS ONLY"
+    val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 OFFSET 2 ROWS FETCH NEXT 5 ROWS ONLY"
+
+    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+      x.productElement(0).asInstanceOf[Int])
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
     tEnv.registerDataSet("MyTable", ds)
-    tEnv.sql(sqlQuery).toDataSet[Row]
+
+    val expected = sortExpectedly(tupleDataSetStrings, 2, 7)
+    val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+
+    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+
+    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
index 235fc45..c4a5a74 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
@@ -99,4 +99,40 @@ class SortITCase(
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
 
+  @Test
+  def testOrderByOffset(): Unit = {
+    val env = getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val t = ds.toTable(tEnv).orderBy('_1.asc).limit(3)
+    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+      x.productElement(0).asInstanceOf[Int])
+
+    val expected = sortExpectedly(tupleDataSetStrings, 3, 21)
+    val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+
+    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+
+    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+  }
+
+  @Test
+  def testOrderByOffsetAndFetch(): Unit = {
+    val env = getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val t = ds.toTable(tEnv).orderBy('_1.asc).limit(3, 5)
+    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+      x.productElement(0).asInstanceOf[Int])
+
+    val expected = sortExpectedly(tupleDataSetStrings, 3, 8)
+    val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+
+    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+
+    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala
index 07765fd..8d1f653 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala
@@ -42,8 +42,17 @@ object SortTestUtils {
     ,(20, 6L, "Comment#14")
     ,(21, 6L, "Comment#15"))
 
-  def sortExpectedly(dataSet: List[Product])(implicit ordering: Ordering[Product]): String = {
-    dataSet.sorted(ordering).mkString("\n").replaceAll("[\\(\\)]", "")
+  def sortExpectedly(dataSet: List[Product])
+                    (implicit ordering: Ordering[Product]): String = 
+    sortExpectedly(dataSet, 0, dataSet.length)
+
+  def sortExpectedly(dataSet: List[Product], start: Int, end: Int)
+                    (implicit ordering: Ordering[Product]): String = {
+    dataSet
+      .sorted(ordering)
+      .slice(start, end)
+      .mkString("\n")
+      .replaceAll("[\\(\\)]", "")
   }
 
 }