You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/08/11 08:28:10 UTC
[1/2] flink git commit: [FLINK-3940] [table] Additional improvements
Repository: flink
Updated Branches:
refs/heads/master dc5062557 -> bdd7a114d
[FLINK-3940] [table] Additional improvements
- Improve overflow handling (support for more records than Int.MAX)
- SQL LIMIT support
- Bug fixing and improved docs
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bdd7a114
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bdd7a114
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bdd7a114
Branch: refs/heads/master
Commit: bdd7a114d9411e2bda51ad296061c5fca742dc8b
Parents: 0472cb9
Author: twalthr <tw...@apache.org>
Authored: Wed Aug 10 23:49:27 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Thu Aug 11 10:27:11 2016 +0200
----------------------------------------------------------------------
docs/apis/table.md | 33 ++++++--
.../api/table/plan/logical/operators.scala | 45 +++++-----
.../table/plan/nodes/dataset/DataSetSort.scala | 86 ++++++++++++++------
.../table/runtime/CountPartitionFunction.scala | 10 ++-
.../api/table/runtime/LimitFilterFunction.scala | 42 +++++++---
.../org/apache/flink/api/table/table.scala | 70 ++++++++--------
.../flink/api/scala/batch/sql/SortITCase.scala | 36 +++++++-
.../api/scala/batch/table/SortITCase.scala | 33 +++++++-
.../scala/stream/table/UnsupportedOpsTest.scala | 8 ++
9 files changed, 254 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/docs/apis/table.md
----------------------------------------------------------------------
diff --git a/docs/apis/table.md b/docs/apis/table.md
index 6793fde..57252d9 100644
--- a/docs/apis/table.md
+++ b/docs/apis/table.md
@@ -668,7 +668,7 @@ Table result = left.minusAll(right);
<tr>
<td><strong>Distinct</strong></td>
<td>
- <p>Similar to a SQL DISTINCT clause. Returns rows with distinct value combinations.</p>
+ <p>Similar to a SQL DISTINCT clause. Returns records with distinct value combinations.</p>
{% highlight java %}
Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.distinct();
@@ -679,7 +679,7 @@ Table result = in.distinct();
<tr>
<td><strong>Order By</strong></td>
<td>
- <p>Similar to a SQL ORDER BY clause. Returns rows globally sorted across all parallel partitions.</p>
+ <p>Similar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions.</p>
{% highlight java %}
Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.orderBy("a.asc");
@@ -690,15 +690,15 @@ Table result = in.orderBy("a.asc");
<tr>
<td><strong>Limit</strong></td>
<td>
- <p>Similar to a SQL LIMIT clause. Returns specified number of rows from offset position. It is technically part of the ORDER BY clause.</p>
+ <p>Similar to a SQL LIMIT clause. Limits a sorted result to a specified number of records from an offset position. Limit is technically part of the Order By operator and thus must be preceded by it.</p>
{% highlight java %}
Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3);
+Table result = in.orderBy("a.asc").limit(3); // returns unlimited number of records beginning with the 4th record
{% endhighlight %}
or
{% highlight java %}
Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3, 5);
+Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning with the 4th record
{% endhighlight %}
</td>
</tr>
@@ -890,7 +890,7 @@ val result = left.minusAll(right);
<tr>
<td><strong>Distinct</strong></td>
<td>
- <p>Similar to a SQL DISTINCT clause. Returns rows with distinct value combinations.</p>
+ <p>Similar to a SQL DISTINCT clause. Returns records with distinct value combinations.</p>
{% highlight scala %}
val in = ds.toTable(tableEnv, 'a, 'b, 'c);
val result = in.distinct();
@@ -901,7 +901,7 @@ val result = in.distinct();
<tr>
<td><strong>Order By</strong></td>
<td>
- <p>Similar to a SQL ORDER BY clause. Returns rows globally sorted across all parallel partitions.</p>
+ <p>Similar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions.</p>
{% highlight scala %}
val in = ds.toTable(tableEnv, 'a, 'b, 'c);
val result = in.orderBy('a.asc);
@@ -909,6 +909,22 @@ val result = in.orderBy('a.asc);
</td>
</tr>
+ <tr>
+ <td><strong>Limit</strong></td>
+ <td>
+ <p>Similar to a SQL LIMIT clause. Limits a sorted result to a specified number of records from an offset position. Limit is technically part of the Order By operator and thus must be preceded by it.</p>
+{% highlight scala %}
+val in = ds.toTable(tableEnv, 'a, 'b, 'c);
+val result = in.orderBy('a.asc).limit(3); // returns unlimited number of records beginning with the 4th record
+{% endhighlight %}
+or
+{% highlight scala %}
+val in = ds.toTable(tableEnv, 'a, 'b, 'c);
+val result = in.orderBy('a.asc).limit(3, 5); // returns 5 records beginning with the 4th record
+{% endhighlight %}
+ </td>
+ </tr>
+
</tbody>
</table>
</div>
@@ -1087,6 +1103,9 @@ query:
| query INTERSECT query
}
[ ORDER BY orderItem [, orderItem ]* ]
+ [ LIMIT { count | ALL } ]
+ [ OFFSET start { ROW | ROWS } ]
+ [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
orderItem:
expression [ ASC | DESC ]
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
index 0d4cf2c..79c3cb1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
@@ -19,8 +19,8 @@ package org.apache.flink.api.table.plan.logical
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.logical.{LogicalSort, LogicalProject}
-import org.apache.calcite.rex.{RexLiteral, RexInputRef, RexNode}
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.rex.{RexInputRef, RexNode}
import org.apache.calcite.tools.RelBuilder
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -40,9 +40,9 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend
val newProjectList =
afterResolve.projectList.zipWithIndex.map { case (e, i) =>
e match {
- case u @ UnresolvedAlias(child) => child match {
+ case u @ UnresolvedAlias(c) => c match {
case ne: NamedExpression => ne
- case e if !e.valid => u
+ case expr if !expr.valid => u
case c @ Cast(ne: NamedExpression, tp) => Alias(c, s"${ne.name}-$tp")
case other => Alias(other, s"_c$i")
}
@@ -62,14 +62,14 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend
case n: Alias =>
// explicit name
if (names.contains(n.name)) {
- throw new ValidationException(s"Duplicate field name $n.name.")
+ throw ValidationException(s"Duplicate field name $n.name.")
} else {
names.add(n.name)
}
case r: ResolvedFieldReference =>
// simple field forwarding
if (names.contains(r.name)) {
- throw new ValidationException(s"Duplicate field name $r.name.")
+ throw ValidationException(s"Duplicate field name $r.name.")
} else {
names.add(r.name)
}
@@ -98,10 +98,10 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend
case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) extends UnaryNode {
override def output: Seq[Attribute] =
- throw new UnresolvedException("Invalid call to output on AliasNode")
+ throw UnresolvedException("Invalid call to output on AliasNode")
override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder =
- throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
+ throw UnresolvedException("Invalid call to toRelNode on AliasNode")
override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
if (aliasList.length > child.output.length) {
@@ -150,7 +150,7 @@ case class Sort(order: Seq[Ordering], child: LogicalNode) extends UnaryNode {
}
}
-case class Limit(offset: Int, fetch: Int, child: LogicalNode) extends UnaryNode {
+case class Limit(offset: Int, fetch: Int = -1, child: LogicalNode) extends UnaryNode {
override def output: Seq[Attribute] = child.output
override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
@@ -160,10 +160,13 @@ case class Limit(offset: Int, fetch: Int, child: LogicalNode) extends UnaryNode
override def validate(tableEnv: TableEnvironment): LogicalNode = {
if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
- throw new TableException(s"Limit on stream tables is currently not supported.")
+ failValidation(s"Limit on stream tables is currently not supported.")
}
if (!child.validate(tableEnv).isInstanceOf[Sort]) {
- throw new TableException(s"Limit operator must follow behind orderBy clause.")
+ failValidation(s"Limit operator must be preceded by an OrderBy operator.")
+ }
+ if (offset < 0) {
+ failValidation(s"Offset should be greater than or equal to zero.")
}
super.validate(tableEnv)
}
@@ -193,11 +196,9 @@ case class Aggregate(
child: LogicalNode) extends UnaryNode {
override def output: Seq[Attribute] = {
- (groupingExpressions ++ aggregateExpressions) map { agg =>
- agg match {
- case ne: NamedExpression => ne.toAttribute
- case e => Alias(e, e.toString).toAttribute
- }
+ (groupingExpressions ++ aggregateExpressions) map {
+ case ne: NamedExpression => ne.toAttribute
+ case e => Alias(e, e.toString).toAttribute
}
}
@@ -205,11 +206,9 @@ case class Aggregate(
child.construct(relBuilder)
relBuilder.aggregate(
relBuilder.groupKey(groupingExpressions.map(_.toRexNode(relBuilder)).asJava),
- aggregateExpressions.map { e =>
- e match {
- case Alias(agg: Aggregation, name) => agg.toAggCall(name)(relBuilder)
- case _ => throw new RuntimeException("This should never happen.")
- }
+ aggregateExpressions.map {
+ case Alias(agg: Aggregation, name) => agg.toAggCall(name)(relBuilder)
+ case _ => throw new RuntimeException("This should never happen.")
}.asJava)
}
@@ -403,7 +402,7 @@ case class Join(
right)
}
val resolvedCondition = node.condition.map(_.postOrderTransform(partialFunction))
- new Join(node.left, node.right, node.joinType, resolvedCondition)
+ Join(node.left, node.right, node.joinType, resolvedCondition)
}
override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
@@ -429,7 +428,7 @@ case class Join(
failValidation(s"join relations with ambiguous names: ${ambiguousName.mkString(", ")}")
}
- resolvedJoin.condition.foreach(testJoinCondition(_))
+ resolvedJoin.condition.foreach(testJoinCondition)
resolvedJoin
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
index ef3005c..22930e7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
@@ -28,8 +28,8 @@ import org.apache.calcite.rex.{RexLiteral, RexNode}
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.BatchTableEnvironment
-import org.apache.flink.api.table.runtime.{LimitFilterFunction, CountPartitionFunction}
+import org.apache.flink.api.table.{BatchTableEnvironment, TableException}
+import org.apache.flink.api.table.runtime.{CountPartitionFunction, LimitFilterFunction}
import org.apache.flink.api.table.typeutils.TypeConverter._
import scala.collection.JavaConverters._
@@ -40,12 +40,24 @@ class DataSetSort(
inp: RelNode,
collations: RelCollation,
rowType2: RelDataType,
- offset: RexNode,
+ offset: RexNode,
fetch: RexNode)
extends SingleRel(cluster, traitSet, inp)
- with DataSetRel{
+ with DataSetRel {
- override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode ={
+ private val limitStart: Long = if (offset != null) {
+ RexLiteral.intValue(offset)
+ } else {
+ 0L
+ }
+
+ private val limitEnd: Long = if (fetch != null) {
+ RexLiteral.intValue(fetch) + limitStart
+ } else {
+ Long.MaxValue
+ }
+
+ override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
new DataSetSort(
cluster,
traitSet,
@@ -58,18 +70,24 @@ class DataSetSort(
}
override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]] = None): DataSet[Any] = {
+ tableEnv: BatchTableEnvironment,
+ expectedType: Option[TypeInformation[Any]] = None)
+ : DataSet[Any] = {
+
+ if (fieldCollations.isEmpty) {
+ throw TableException("Limiting the result without sorting is not allowed " +
+ "as it could lead to arbitrary results.")
+ }
val config = tableEnv.getConfig
- val inputDS = inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+ val inputDs = inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- val currentParallelism = inputDS.getExecutionEnvironment.getParallelism
+ val currentParallelism = inputDs.getExecutionEnvironment.getParallelism
var partitionedDs = if (currentParallelism == 1) {
- inputDS
+ inputDs
} else {
- inputDS.partitionByRange(fieldCollations.map(_._1): _*)
+ inputDs.partitionByRange(fieldCollations.map(_._1): _*)
.withOrders(fieldCollations.map(_._2): _*)
}
@@ -77,28 +95,37 @@ class DataSetSort(
partitionedDs = partitionedDs.sortPartition(fieldCollation._1, fieldCollation._2)
}
- val limitedDS = if (offset == null && fetch == null) {
+ val limitedDs = if (offset == null && fetch == null) {
partitionedDs
} else {
- val limitStart = if (offset != null) RexLiteral.intValue(offset) else 0
- val limitEnd = if (fetch != null) RexLiteral.intValue(fetch) + limitStart else Int.MaxValue
-
val countFunction = new CountPartitionFunction[Any]
- val partitionCount = partitionedDs.mapPartition(countFunction)
+
+ val partitionCountName = s"prepare offset/fetch"
+
+ val partitionCount = partitionedDs
+ .mapPartition(countFunction)
+ .name(partitionCountName)
+
+ val broadcastName = "countPartition"
val limitFunction = new LimitFilterFunction[Any](
limitStart,
limitEnd,
- "countPartition")
- partitionedDs.filter(limitFunction).withBroadcastSet(partitionCount, "countPartition")
- }
+ broadcastName)
+
+ val limitName = s"offset: $offsetToString, fetch: $fetchToString"
+ partitionedDs
+ .filter(limitFunction)
+ .name(limitName)
+ .withBroadcastSet(partitionCount, broadcastName)
+ }
val inputType = partitionedDs.getType
expectedType match {
case None if config.getEfficientTypeUsage =>
- limitedDS
+ limitedDs
case _ =>
val determinedType = determineReturnType(
@@ -119,11 +146,13 @@ class DataSetSort(
getRowType.getFieldNames.asScala
)
- limitedDS.map(mapFunc)
+ val opName = s"convert: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"
+
+ limitedDs.map(mapFunc).name(opName)
}
// no conversion necessary, forward
else {
- limitedDS
+ limitedDs
}
}
}
@@ -143,10 +172,21 @@ class DataSetSort(
private val sortFieldsToString = fieldCollations
.map(col => s"${rowType2.getFieldNames.get(col._1)} ${col._2.getShortName}" ).mkString(", ")
- override def toString: String = s"Sort(by: $sortFieldsToString)"
+ private val offsetToString = s"$offset"
+
+ private val fetchToString = if (limitEnd == Long.MaxValue) {
+ "unlimited"
+ } else {
+ s"$limitEnd"
+ }
+
+ override def toString: String =
+ s"Sort(by: ($sortFieldsToString), offset: $offsetToString, fetch: $fetchToString)"
override def explainTerms(pw: RelWriter) : RelWriter = {
super.explainTerms(pw)
.item("orderBy", sortFieldsToString)
+ .item("offset", offsetToString)
+ .item("fetch", fetchToString)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
index 79b8623..5896f4c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
@@ -23,14 +23,16 @@ import java.lang.Iterable
import org.apache.flink.api.common.functions.RichMapPartitionFunction
import org.apache.flink.util.Collector
-class CountPartitionFunction[IN] extends RichMapPartitionFunction[IN, (Int, Int)] {
- var elementCount = 0
+class CountPartitionFunction[IN] extends RichMapPartitionFunction[IN, (Int, Long)] {
- override def mapPartition(value: Iterable[IN], out: Collector[(Int, Int)]): Unit = {
+ override def mapPartition(value: Iterable[IN], out: Collector[(Int, Long)]): Unit = {
val partitionIndex = getRuntimeContext.getIndexOfThisSubtask
+ var elementCount = 0L
val iterator = value.iterator()
while (iterator.hasNext) {
- elementCount += 1
+ if (elementCount != Long.MaxValue) { // prevent overflow
+ elementCount += 1L
+ }
iterator.next()
}
out.collect(partitionIndex, elementCount)
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
index 311b616..5ec9035 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
@@ -21,24 +21,44 @@ package org.apache.flink.api.table.runtime
import org.apache.flink.api.common.functions.RichFilterFunction
import org.apache.flink.configuration.Configuration
-import scala.collection.mutable
import scala.collection.JavaConverters._
-class LimitFilterFunction[T](limitStart: Int,
- limitEnd: Int,
- broadcast: String) extends RichFilterFunction[T] {
- var elementCount = 0
- var countList = mutable.Buffer[Int]()
+
+class LimitFilterFunction[T](
+ limitStart: Long,
+ limitEnd: Long,
+ broadcastName: String)
+ extends RichFilterFunction[T] {
+
+ var partitionIndex: Int = _
+ var elementCount: Long = _
+ var countList: Array[Long] = _
override def open(config: Configuration) {
- countList = getRuntimeContext.getBroadcastVariable[(Int, Int)](broadcast).asScala
- .sortWith(_._1 < _._1).map(_._2).scanLeft(0) (_ + _)
+ partitionIndex = getRuntimeContext.getIndexOfThisSubtask
+
+ val countPartitionResult = getRuntimeContext
+ .getBroadcastVariable[(Int, Long)](broadcastName)
+ .asScala
+
+ // sort by partition index, extract number per partition, sum with intermediate results
+ countList = countPartitionResult.sortWith(_._1 < _._1).map(_._2).scanLeft(0L) { case (a, b) =>
+ val sum = a + b
+ if (sum < 0L) { // prevent overflow
+ Long.MaxValue
+ }
+ sum
+ }.toArray
+
+ elementCount = 0
}
override def filter(value: T): Boolean = {
- val partitionIndex = getRuntimeContext.getIndexOfThisSubtask
- elementCount += 1
+ if (elementCount != Long.MaxValue) { // prevent overflow
+ elementCount += 1L
+ }
+ // we filter out records that are not within the limit (Long.MaxValue is unlimited)
limitStart - countList(partitionIndex) < elementCount &&
- limitEnd - countList(partitionIndex) >= elementCount
+ (limitEnd == Long.MaxValue || limitEnd - countList(partitionIndex) >= elementCount)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index c9fd78c..bfabd32 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -563,7 +563,7 @@ class Table(
* Example:
*
* {{{
- * tab.orderBy("name DESC")
+ * tab.orderBy("name.desc")
* }}}
*/
def orderBy(fields: String): Table = {
@@ -572,45 +572,39 @@ class Table(
}
/**
- * LIMIT is called an argument since it is technically part of the ORDER BY clause.
- * The statement is used to retrieve records from table and limit the number of records
- * returned based on a limit value.
- *
- * Example:
- *
- * {{{
- * tab.orderBy('name.desc).limit(3)
- * }}}
- *
- * @param offset The number of rows to skip before including them in the result.
- */
+ * Limits a sorted result from an offset position.
+ * Similar to a SQL LIMIT clause. Limit is technically part of the Order By operator and
+ * thus must be preceded by it.
+ *
+ * Example:
+ *
+ * {{{
+ * // returns unlimited number of records beginning with the 4th record
+ * tab.orderBy('name.desc).limit(3)
+ * }}}
+ *
+ * @param offset number of records to skip
+ */
def limit(offset: Int): Table = {
- if (offset < 0) {
- throw new ValidationException("Offset should be greater than or equal to zero.")
- }
- new Table(tableEnv, Limit(offset, -1, logicalPlan).validate(tableEnv))
- }
-
- /**
- * LIMIT is called an argument since it is technically part of the ORDER BY clause.
- * The statement is used to retrieve records from table and limit the number of records
- * returned based on a limit value.
- *
- * Example:
- *
- * {{{
- * tab.orderBy('name.desc).limit(3, 5)
- * }}}
- *
- * @param offset The number of rows to skip before including them in the result.
- * @param fetch The number of records returned.
- */
+ new Table(tableEnv, Limit(offset = offset, child = logicalPlan).validate(tableEnv))
+ }
+
+ /**
+ * Limits a sorted result to a specified number of records from an offset position.
+ * Similar to a SQL LIMIT clause. Limit is technically part of the Order By operator and
+ * thus must be preceded by it.
+ *
+ * Example:
+ *
+ * {{{
+ * // returns 5 records beginning with the 4th record
+ * tab.orderBy('name.desc).limit(3, 5)
+ * }}}
+ *
+ * @param offset number of records to skip
+ * @param fetch number of records to be returned
+ */
def limit(offset: Int, fetch: Int): Table = {
- if (offset < 0 || fetch < 1) {
- throw new ValidationException(
- "Offset should be greater than or equal to zero and" +
- " fetch should be greater than or equal to one.")
- }
new Table(tableEnv, Limit(offset, fetch, logicalPlan).validate(tableEnv))
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
index 7c18e14..f345984 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
@@ -25,7 +25,7 @@ import org.apache.flink.api.scala.batch.utils.SortTestUtils._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala._
-import org.apache.flink.api.table.{TableException, Row, TableEnvironment}
+import org.apache.flink.api.table.{Row, TableEnvironment, TableException}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit._
@@ -102,4 +102,38 @@ class SortITCase(
TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
}
+ @Test
+ def testOrderByLimit(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 LIMIT 5"
+
+ implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+ x.productElement(0).asInstanceOf[Int])
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds)
+
+ val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
+ val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+
+ val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+
+ TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testLimitWithoutOrder(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable LIMIT 5"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds)
+
+ tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
index c4a5a74..d4a1d8d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
@@ -18,13 +18,13 @@
package org.apache.flink.api.scala.batch.table
+import org.apache.flink.api.scala.batch.utils.SortTestUtils._
import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.batch.utils.SortTestUtils._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
-import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit._
@@ -135,4 +135,33 @@ class SortITCase(
TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
}
+ @Test
+ def testOrderByFetch(): Unit = {
+ val env = getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ val t = ds.toTable(tEnv).orderBy('_1.asc).limit(0, 5)
+ implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+ x.productElement(0).asInstanceOf[Int])
+
+ val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
+ val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+
+ val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+
+ TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testFetchWithoutOrder(): Unit = {
+ val env = getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ val t = ds.toTable(tEnv).limit(0, 5)
+
+ t.toDataSet[Row].collect()
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
index 89b0fdc..8ce1472 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
@@ -109,4 +109,12 @@ class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase {
val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
t1.minusAll(t2)
}
+
+ @Test(expected = classOf[ValidationException])
+ def testLimit(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+ t1.limit(0,5)
+ }
}
[2/2] flink git commit: [FLINK-3940] [table] Add support for ORDER BY
OFFSET FETCH
Posted by tw...@apache.org.
[FLINK-3940] [table] Add support for ORDER BY OFFSET FETCH
This closes #2282.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0472cb9b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0472cb9b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0472cb9b
Branch: refs/heads/master
Commit: 0472cb9ba44de7223fffbc0eca0232d558730772
Parents: dc50625
Author: gallenvara <ga...@126.com>
Authored: Fri Jul 22 11:39:46 2016 +0800
Committer: twalthr <tw...@apache.org>
Committed: Thu Aug 11 10:27:11 2016 +0200
----------------------------------------------------------------------
docs/apis/table.md | 17 +++++++-
.../api/table/plan/logical/operators.scala | 23 +++++++++-
.../table/plan/nodes/dataset/DataSetSort.scala | 33 ++++++++++++---
.../plan/rules/dataSet/DataSetSortRule.scala | 25 ++---------
.../table/runtime/CountPartitionFunction.scala | 38 +++++++++++++++++
.../api/table/runtime/LimitFilterFunction.scala | 44 ++++++++++++++++++++
.../org/apache/flink/api/table/table.scala | 43 +++++++++++++++++++
.../flink/api/scala/batch/sql/SortITCase.scala | 34 +++++++++++----
.../api/scala/batch/table/SortITCase.scala | 36 ++++++++++++++++
.../api/scala/batch/utils/SortTestUtils.scala | 13 +++++-
10 files changed, 267 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/docs/apis/table.md
----------------------------------------------------------------------
diff --git a/docs/apis/table.md b/docs/apis/table.md
index cb56656..6793fde 100644
--- a/docs/apis/table.md
+++ b/docs/apis/table.md
@@ -687,6 +687,22 @@ Table result = in.orderBy("a.asc");
</td>
</tr>
+ <tr>
+ <td><strong>Limit</strong></td>
+ <td>
+ <p>Similar to a SQL LIMIT clause. Returns specified number of rows from offset position. It is technically part of the ORDER BY clause.</p>
+{% highlight java %}
+Table in = tableEnv.fromDataSet(ds, "a, b, c");
+Table result = in.orderBy("a.asc").limit(3);
+{% endhighlight %}
+or
+{% highlight java %}
+Table in = tableEnv.fromDataSet(ds, "a, b, c");
+Table result = in.orderBy("a.asc").limit(3, 5);
+{% endhighlight %}
+ </td>
+ </tr>
+
</tbody>
</table>
@@ -1009,7 +1025,6 @@ Among others, the following SQL features are not supported, yet:
- Timestamps are limited to milliseconds precision
- Distinct aggregates (e.g., `COUNT(DISTINCT name)`)
- Non-equi joins and Cartesian products
-- Result selection by order position (`ORDER BY OFFSET FETCH`)
- Grouping sets
*Note: Tables are joined in the order in which they are specified in the `FROM` clause. In some cases the table order must be manually tweaked to resolve Cartesian products.*
http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
index ad8618c..0d4cf2c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
@@ -19,8 +19,8 @@ package org.apache.flink.api.table.plan.logical
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.logical.LogicalProject
-import org.apache.calcite.rex.{RexInputRef, RexNode}
+import org.apache.calcite.rel.logical.{LogicalSort, LogicalProject}
+import org.apache.calcite.rex.{RexLiteral, RexInputRef, RexNode}
import org.apache.calcite.tools.RelBuilder
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -150,6 +150,25 @@ case class Sort(order: Seq[Ordering], child: LogicalNode) extends UnaryNode {
}
}
+case class Limit(offset: Int, fetch: Int, child: LogicalNode) extends UnaryNode {
+ override def output: Seq[Attribute] = child.output
+
+ override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
+ child.construct(relBuilder)
+ relBuilder.limit(offset, fetch)
+ }
+
+ override def validate(tableEnv: TableEnvironment): LogicalNode = {
+ if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
+ throw new TableException(s"Limit on stream tables is currently not supported.")
+ }
+ if (!child.validate(tableEnv).isInstanceOf[Sort]) {
+ throw new TableException(s"Limit operator must follow behind orderBy clause.")
+ }
+ super.validate(tableEnv)
+ }
+}
+
case class Filter(condition: Expression, child: LogicalNode) extends UnaryNode {
override def output: Seq[Attribute] = child.output
http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
index 1af03d8..ef3005c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
@@ -24,10 +24,12 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.RelFieldCollation.Direction
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex.{RexLiteral, RexNode}
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.runtime.{LimitFilterFunction, CountPartitionFunction}
import org.apache.flink.api.table.typeutils.TypeConverter._
import scala.collection.JavaConverters._
@@ -37,7 +39,9 @@ class DataSetSort(
traitSet: RelTraitSet,
inp: RelNode,
collations: RelCollation,
- rowType2: RelDataType)
+ rowType2: RelDataType,
+ offset: RexNode,
+ fetch: RexNode)
extends SingleRel(cluster, traitSet, inp)
with DataSetRel{
@@ -47,7 +51,9 @@ class DataSetSort(
traitSet,
inputs.get(0),
collations,
- rowType2
+ rowType2,
+ offset,
+ fetch
)
}
@@ -71,11 +77,28 @@ class DataSetSort(
partitionedDs = partitionedDs.sortPartition(fieldCollation._1, fieldCollation._2)
}
+ val limitedDS = if (offset == null && fetch == null) {
+ partitionedDs
+ } else {
+ val limitStart = if (offset != null) RexLiteral.intValue(offset) else 0
+ val limitEnd = if (fetch != null) RexLiteral.intValue(fetch) + limitStart else Int.MaxValue
+
+ val countFunction = new CountPartitionFunction[Any]
+ val partitionCount = partitionedDs.mapPartition(countFunction)
+
+ val limitFunction = new LimitFilterFunction[Any](
+ limitStart,
+ limitEnd,
+ "countPartition")
+ partitionedDs.filter(limitFunction).withBroadcastSet(partitionCount, "countPartition")
+ }
+
+
val inputType = partitionedDs.getType
expectedType match {
case None if config.getEfficientTypeUsage =>
- partitionedDs
+ limitedDS
case _ =>
val determinedType = determineReturnType(
@@ -96,11 +119,11 @@ class DataSetSort(
getRowType.getFieldNames.asScala
)
- partitionedDs.map(mapFunc)
+ limitedDS.map(mapFunc)
}
// no conversion necessary, forward
else {
- partitionedDs
+ limitedDS
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
index b26d1de..5c1fb53 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
@@ -18,12 +18,10 @@
package org.apache.flink.api.table.plan.rules.dataSet
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.core.JoinRelType
import org.apache.calcite.rel.logical.{LogicalJoin, LogicalSort}
-import org.apache.flink.api.table.TableException
import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSort}
class DataSetSortRule
@@ -33,23 +31,6 @@ class DataSetSortRule
DataSetConvention.INSTANCE,
"DataSetSortRule") {
- /**
- * Only translate when no OFFSET or LIMIT specified
- */
- override def matches(call: RelOptRuleCall): Boolean = {
- val sort = call.rel(0).asInstanceOf[LogicalSort]
-
- if (sort.offset != null) {
- throw new TableException("ORDER BY OFFSET is currently not supported.")
- }
-
- if (sort.fetch != null) {
- throw new TableException("ORDER BY FETCH is currently not supported.")
- }
-
- sort.offset == null && sort.fetch == null
- }
-
override def convert(rel: RelNode): RelNode = {
val sort: LogicalSort = rel.asInstanceOf[LogicalSort]
@@ -61,7 +42,9 @@ class DataSetSortRule
traitSet,
convInput,
sort.getCollation,
- rel.getRowType
+ rel.getRowType,
+ sort.offset,
+ sort.fetch
)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
new file mode 100644
index 0000000..79b8623
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichMapPartitionFunction
+import org.apache.flink.util.Collector
+
+class CountPartitionFunction[IN] extends RichMapPartitionFunction[IN, (Int, Int)] {
+ var elementCount = 0
+
+ override def mapPartition(value: Iterable[IN], out: Collector[(Int, Int)]): Unit = {
+ val partitionIndex = getRuntimeContext.getIndexOfThisSubtask
+ val iterator = value.iterator()
+ while (iterator.hasNext) {
+ elementCount += 1
+ iterator.next()
+ }
+ out.collect(partitionIndex, elementCount)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
new file mode 100644
index 0000000..311b616
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.configuration.Configuration
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+class LimitFilterFunction[T](limitStart: Int,
+ limitEnd: Int,
+ broadcast: String) extends RichFilterFunction[T] {
+ var elementCount = 0
+ var countList = mutable.Buffer[Int]()
+
+ override def open(config: Configuration) {
+ countList = getRuntimeContext.getBroadcastVariable[(Int, Int)](broadcast).asScala
+ .sortWith(_._1 < _._1).map(_._2).scanLeft(0) (_ + _)
+ }
+
+ override def filter(value: T): Boolean = {
+ val partitionIndex = getRuntimeContext.getIndexOfThisSubtask
+ elementCount += 1
+ limitStart - countList(partitionIndex) < elementCount &&
+ limitEnd - countList(partitionIndex) >= elementCount
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index cbb9a07..c9fd78c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -572,6 +572,49 @@ class Table(
}
/**
+ * LIMIT is called an argument since it is technically part of the ORDER BY clause.
+ * The statement is used to retrieve records from table and limit the number of records
+ * returned based on a limit value.
+ *
+ * Example:
+ *
+ * {{{
+ * tab.orderBy('name.desc).limit(3)
+ * }}}
+ *
+ * @param offset The number of rows to skip before including them in the result.
+ */
+ def limit(offset: Int): Table = {
+ if (offset < 0) {
+ throw new ValidationException("Offset should be greater than or equal to zero.")
+ }
+ new Table(tableEnv, Limit(offset, -1, logicalPlan).validate(tableEnv))
+ }
+
+ /**
+ * LIMIT is called an argument since it is technically part of the ORDER BY clause.
+ * The statement is used to retrieve records from table and limit the number of records
+ * returned based on a limit value.
+ *
+ * Example:
+ *
+ * {{{
+ * tab.orderBy('name.desc).limit(3, 5)
+ * }}}
+ *
+ * @param offset The number of rows to skip before including them in the result.
+ * @param fetch The number of records returned.
+ */
+ def limit(offset: Int, fetch: Int): Table = {
+ if (offset < 0 || fetch < 1) {
+ throw new ValidationException(
+ "Offset should be greater than or equal to zero and" +
+ " fetch should be greater than or equal to one.")
+ }
+ new Table(tableEnv, Limit(offset, fetch, logicalPlan).validate(tableEnv))
+ }
+
+ /**
* Writes the [[Table]] to a [[TableSink]]. A [[TableSink]] defines an external storage location.
*
* A batch [[Table]] can only be written to a
http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
index 858f75a..7c18e14 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
@@ -60,28 +60,46 @@ class SortITCase(
TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
}
- @Test(expected = classOf[TableException])
- def testOrderByOffset(): Unit = {
+ @Test
+ def testOrderByWithOffset(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
- val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 OFFSET 2 ROWS"
+ val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC OFFSET 2 ROWS"
+
+ implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+ -x.productElement(0).asInstanceOf[Int])
val ds = CollectionDataSets.get3TupleDataSet(env)
tEnv.registerDataSet("MyTable", ds)
- tEnv.sql(sqlQuery).toDataSet[Row]
+
+ val expected = sortExpectedly(tupleDataSetStrings, 2, 21)
+ val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+
+ val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+
+ TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
}
- @Test(expected = classOf[TableException])
- def testOrderByFirst(): Unit = {
+ @Test
+ def testOrderByWithOffsetAndFetch(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
- val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 FETCH NEXT 2 ROWS ONLY"
+ val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 OFFSET 2 ROWS FETCH NEXT 5 ROWS ONLY"
+
+ implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+ x.productElement(0).asInstanceOf[Int])
val ds = CollectionDataSets.get3TupleDataSet(env)
tEnv.registerDataSet("MyTable", ds)
- tEnv.sql(sqlQuery).toDataSet[Row]
+
+ val expected = sortExpectedly(tupleDataSetStrings, 2, 7)
+ val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+
+ val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+
+ TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
index 235fc45..c4a5a74 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
@@ -99,4 +99,40 @@ class SortITCase(
TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
}
+ @Test
+ def testOrderByOffset(): Unit = {
+ val env = getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ val t = ds.toTable(tEnv).orderBy('_1.asc).limit(3)
+ implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+ x.productElement(0).asInstanceOf[Int])
+
+ val expected = sortExpectedly(tupleDataSetStrings, 3, 21)
+ val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+
+ val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+
+ TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+ }
+
+ @Test
+ def testOrderByOffsetAndFetch(): Unit = {
+ val env = getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ val t = ds.toTable(tEnv).orderBy('_1.asc).limit(3, 5)
+ implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+ x.productElement(0).asInstanceOf[Int])
+
+ val expected = sortExpectedly(tupleDataSetStrings, 3, 8)
+ val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+
+ val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+
+ TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala
index 07765fd..8d1f653 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala
@@ -42,8 +42,17 @@ object SortTestUtils {
,(20, 6L, "Comment#14")
,(21, 6L, "Comment#15"))
- def sortExpectedly(dataSet: List[Product])(implicit ordering: Ordering[Product]): String = {
- dataSet.sorted(ordering).mkString("\n").replaceAll("[\\(\\)]", "")
+ def sortExpectedly(dataSet: List[Product])
+ (implicit ordering: Ordering[Product]): String =
+ sortExpectedly(dataSet, 0, dataSet.length)
+
+ def sortExpectedly(dataSet: List[Product], start: Int, end: Int)
+ (implicit ordering: Ordering[Product]): String = {
+ dataSet
+ .sorted(ordering)
+ .slice(start, end)
+ .mkString("\n")
+ .replaceAll("[\\(\\)]", "")
}
}