You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by fhueske <gi...@git.apache.org> on 2017/10/12 09:09:12 UTC
[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...
GitHub user fhueske opened a pull request:
https://github.com/apache/flink/pull/4813
[FLINK-7821] [table] Deprecate Table.limit() and replace it by Table.offset() and Table.fetch().
## What is the purpose of the change
- `Table.limit(n)` has unexpected semantics of returning all but the first `n` rows.
- Returning the first `n` rows from a sorted result requires to specify a 0 offset (`table.orderBy(...).limit(0, n)`) which is more complicated than necessary.
## Brief change log
- deprecate `Table.limit()` and replace it by `Table.offset()` and `Table.fetch()`
## Verifying this change
- existing tests for `limit` were adapted for `offset` and `fetch`.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): **no**
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no**
- The serializers: **no**
- The runtime per-record code paths (performance sensitive): **no**
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
## Documentation
- Documentation was adapted.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/fhueske/flink tableOffsetFetch
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/4813.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #4813
----
commit 7b1e3cb03a8e863699940b152f727551c2d43ea0
Author: Fabian Hueske <fh...@apache.org>
Date: 2017-10-12T08:56:19Z
[FLINK-7821] [table] Deprecate Table.limit() and replace it by Table.offset() and Table.fetch().
----
---
[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...
Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:
https://github.com/apache/flink/pull/4813#discussion_r146423134
--- Diff: docs/dev/table/tableApi.md ---
@@ -1031,19 +1034,22 @@ val result = in.orderBy('a.asc);
<tr>
<td>
- <strong>Limit</strong><br>
+ <strong>Offset & Fetch</strong><br>
<span class="label label-primary">Batch</span>
</td>
<td>
- <p>Similar to a SQL LIMIT clause. Limits a sorted result to a specified number of records from an offset position. Limit is technically part of the Order By operator and thus must be preceded by it.</p>
+ <p>Similar to the SQL OFFSET and FETCH clauses. Offset and Fetch limit the number of records returned from a sorted result. Offset and Fetch are technically part of the Order By operator and thus must be preceded by it.</p>
{% highlight scala %}
-val in = ds.toTable(tableEnv, 'a, 'b, 'c);
-val result = in.orderBy('a.asc).limit(3); // returns unlimited number of records beginning with the 4th record
-{% endhighlight %}
-or
-{% highlight scala %}
-val in = ds.toTable(tableEnv, 'a, 'b, 'c);
-val result = in.orderBy('a.asc).limit(3, 5); // returns 5 records beginning with the 4th record
+val in = ds.toTable(tableEnv, 'a, 'b, 'c)
+
+// returns the first 5 records from the sorted result
+val result1: Table = in.orderBy('a.asc).fetch(5)
+
+// returns all records beginning with the 4th record from the sorted result
+val result2: Table = in.orderBy('a.asc).offset(3)
+
+// returns the first 5 records beginning with the 10th record from the sorted result
--- End diff --
Should be 11th?
---
[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...
Posted by hustfxj <gi...@git.apache.org>.
Github user hustfxj commented on a diff in the pull request:
https://github.com/apache/flink/pull/4813#discussion_r144257631
--- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/SortValidationTest.scala ---
@@ -27,10 +27,26 @@ import org.junit._
class SortValidationTest extends TableTestBase {
--- End diff --
It had better add the following two validation tests:
```
ds.orderBy('a.asc).offset(10).offset(10)
ds.orderBy('a.asc).fetch(5)
```
Now` fetch` can only be used after `offset`, maybe the `fetch` be used after `orderBy`. `ds.orderBy('a.asc).fetch(5)` is equivalent with `ds.orderBy('a.asc).offset(0)fetch(5)`
---
[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/4813#discussion_r146489748
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---
@@ -726,7 +726,10 @@ class Table(
* }}}
*
* @param offset number of records to skip
+ *
+ * @deprecated Please use [[Table.offset()]] and [[Table.fetch()]] instead.
*/
+ @deprecated(message = "Deprecated in favor of Table.offset() and Table.fetch()", since = "1.4.0")
--- End diff --
This kind of deprecation does not work for Java. Please add the Java annotation as well.
---
[GitHub] flink issue #4813: [FLINK-7821] [table] Deprecate Table.limit() and replace ...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:
https://github.com/apache/flink/pull/4813
Thanks for the reviews @xccui and @twalthr.
I'll address the feedback and merge this PR.
---
[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...
Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:
https://github.com/apache/flink/pull/4813#discussion_r146423916
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---
@@ -745,12 +748,65 @@ class Table(
*
* @param offset number of records to skip
* @param fetch number of records to be returned
+ *
+ * @deprecated Please use [[Table.offset()]] and [[Table.fetch()]] instead.
*/
+ @deprecated(message = "deprecated in favor of Table.offset() and Table.fetch()", since = "1.4.0")
def limit(offset: Int, fetch: Int): Table = {
new Table(tableEnv, Limit(offset, fetch, logicalPlan).validate(tableEnv))
}
/**
+ * Limits a sorted result from an offset position.
+ * Similar to a SQL OFFSET clause. Offset is technically part of the Order By operator and
+ * thus must be preceded by it.
+ *
+ * [[Table.offset(o)]] can be combined with a subsequent [[Table.fetch(n)]] call to return the
+ * first n rows starting from the offset position o.
--- End diff --
If I understand correctly, "from the offset position o" means "from the (o+1)th record", right?
---
[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4813#discussion_r144268057
--- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/SortValidationTest.scala ---
@@ -27,10 +27,26 @@ import org.junit._
class SortValidationTest extends TableTestBase {
--- End diff --
Right, I'll add a test that validates that `table.orderBy('a.asc).offset(10).offset(10)` is rejected.
`table.orderBy('a.asc).fetch(5)` is already supported as you suggested (see the modified `SortITCase`).
---
[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4813#discussion_r146540313
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---
@@ -745,12 +748,65 @@ class Table(
*
* @param offset number of records to skip
* @param fetch number of records to be returned
+ *
+ * @deprecated Please use [[Table.offset()]] and [[Table.fetch()]] instead.
*/
+ @deprecated(message = "deprecated in favor of Table.offset() and Table.fetch()", since = "1.4.0")
def limit(offset: Int, fetch: Int): Table = {
new Table(tableEnv, Limit(offset, fetch, logicalPlan).validate(tableEnv))
}
/**
+ * Limits a sorted result from an offset position.
+ * Similar to a SQL OFFSET clause. Offset is technically part of the Order By operator and
+ * thus must be preceded by it.
+ *
+ * [[Table.offset(o)]] can be combined with a subsequent [[Table.fetch(n)]] call to return the
+ * first n rows starting from the offset position o.
--- End diff --
will do
---
[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...
Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:
https://github.com/apache/flink/pull/4813#discussion_r146426403
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---
@@ -745,12 +748,65 @@ class Table(
*
* @param offset number of records to skip
* @param fetch number of records to be returned
+ *
+ * @deprecated Please use [[Table.offset()]] and [[Table.fetch()]] instead.
*/
+ @deprecated(message = "deprecated in favor of Table.offset() and Table.fetch()", since = "1.4.0")
def limit(offset: Int, fetch: Int): Table = {
new Table(tableEnv, Limit(offset, fetch, logicalPlan).validate(tableEnv))
}
/**
+ * Limits a sorted result from an offset position.
+ * Similar to a SQL OFFSET clause. Offset is technically part of the Order By operator and
+ * thus must be preceded by it.
+ *
+ * [[Table.offset(o)]] can be combined with a subsequent [[Table.fetch(n)]] call to return the
+ * first n rows starting from the offset position o.
+ *
+ * {{{
+ * // returns unlimited number of records beginning with the 4th record
+ * tab.orderBy('name.desc).offset(3)
+ * // return the first 5 records starting from the 10th record
+ * tab.orderBy('name.desc).offset(10).fetch(5)
+ * }}}
+ *
+ * @param offset number of records to skip
+ */
+ def offset(offset: Int): Table = {
+ new Table(tableEnv, Limit(offset, -1, logicalPlan).validate(tableEnv))
+ }
+
+ /**
+ * Limits a sorted result to the first n rows.
+ * Similar to a SQL FETCH clause. Fetch is technically part of the Order By operator and
+ * thus must be preceded by it.
+ *
+ * [[Table.fetch(n)]] can be combined with a preceding [[Table.offset(o)]] call to return the
+ * first n rows starting from the offset position o.
+ *
+ * {{{
+ * // returns the first 3 records.
+ * tab.orderBy('name.desc).fetch(3)
+ * // return the first 5 records starting from the 10th record
+ * tab.orderBy('name.desc).offset(10).fetch(5)
+ * }}}
+ *
+ * @param fetch the number of records to return
+ */
+ def fetch(fetch: Int): Table = {
--- End diff --
Since `fetch: Int < 0` is allowed, maybe we should explicitly define the semantics of it?
---
[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...
Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:
https://github.com/apache/flink/pull/4813#discussion_r146423953
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---
@@ -745,12 +748,65 @@ class Table(
*
* @param offset number of records to skip
* @param fetch number of records to be returned
+ *
+ * @deprecated Please use [[Table.offset()]] and [[Table.fetch()]] instead.
*/
+ @deprecated(message = "deprecated in favor of Table.offset() and Table.fetch()", since = "1.4.0")
def limit(offset: Int, fetch: Int): Table = {
new Table(tableEnv, Limit(offset, fetch, logicalPlan).validate(tableEnv))
}
/**
+ * Limits a sorted result from an offset position.
+ * Similar to a SQL OFFSET clause. Offset is technically part of the Order By operator and
+ * thus must be preceded by it.
+ *
+ * [[Table.offset(o)]] can be combined with a subsequent [[Table.fetch(n)]] call to return the
+ * first n rows starting from the offset position o.
+ *
+ * {{{
+ * // returns unlimited number of records beginning with the 4th record
+ * tab.orderBy('name.desc).offset(3)
+ * // return the first 5 records starting from the 10th record
--- End diff --
11th?
---
[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/4813
---
[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...
Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:
https://github.com/apache/flink/pull/4813#discussion_r146424180
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---
@@ -745,12 +748,65 @@ class Table(
*
* @param offset number of records to skip
* @param fetch number of records to be returned
+ *
+ * @deprecated Please use [[Table.offset()]] and [[Table.fetch()]] instead.
*/
+ @deprecated(message = "deprecated in favor of Table.offset() and Table.fetch()", since = "1.4.0")
def limit(offset: Int, fetch: Int): Table = {
new Table(tableEnv, Limit(offset, fetch, logicalPlan).validate(tableEnv))
}
/**
+ * Limits a sorted result from an offset position.
+ * Similar to a SQL OFFSET clause. Offset is technically part of the Order By operator and
+ * thus must be preceded by it.
+ *
+ * [[Table.offset(o)]] can be combined with a subsequent [[Table.fetch(n)]] call to return the
+ * first n rows starting from the offset position o.
+ *
+ * {{{
+ * // returns unlimited number of records beginning with the 4th record
+ * tab.orderBy('name.desc).offset(3)
+ * // return the first 5 records starting from the 10th record
+ * tab.orderBy('name.desc).offset(10).fetch(5)
+ * }}}
+ *
+ * @param offset number of records to skip
+ */
+ def offset(offset: Int): Table = {
+ new Table(tableEnv, Limit(offset, -1, logicalPlan).validate(tableEnv))
+ }
+
+ /**
+ * Limits a sorted result to the first n rows.
+ * Similar to a SQL FETCH clause. Fetch is technically part of the Order By operator and
+ * thus must be preceded by it.
+ *
+ * [[Table.fetch(n)]] can be combined with a preceding [[Table.offset(o)]] call to return the
+ * first n rows starting from the offset position o.
+ *
+ * {{{
+ * // returns the first 3 records.
+ * tab.orderBy('name.desc).fetch(3)
+ * // return the first 5 records starting from the 10th record
--- End diff --
11th?
---
[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...
Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:
https://github.com/apache/flink/pull/4813#discussion_r146423039
--- Diff: docs/dev/table/tableApi.md ---
@@ -985,19 +985,22 @@ Table result = in.orderBy("a.asc");
<tr>
<td>
- <strong>Limit</strong><br>
+ <strong>Offset & Fetch</strong><br>
<span class="label label-primary">Batch</span>
</td>
<td>
- <p>Similar to a SQL LIMIT clause. Limits a sorted result to a specified number of records from an offset position. Limit is technically part of the Order By operator and thus must be preceded by it.</p>
+ <p>Similar to the SQL OFFSET and FETCH clauses. Offset and Fetch limit the number of records returned from a sorted result. Offset and Fetch are technically part of the Order By operator and thus must be preceded by it.</p>
{% highlight java %}
Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3); // returns unlimited number of records beginning with the 4th record
-{% endhighlight %}
-or
-{% highlight java %}
-Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning with the 4th record
+
+// returns the first 5 records from the sorted result
+Table result1 = in.orderBy("a.asc").fetch(5);
+
+// returns all records beginning with the 4th record from the sorted result
+Table result2 = in.orderBy("a.asc").offset(3);
+
+// returns the first 5 records beginning with the 10th record from the sorted result
--- End diff --
Should be 11th?
---
[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/4813#discussion_r146490693
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---
@@ -745,12 +748,65 @@ class Table(
*
* @param offset number of records to skip
* @param fetch number of records to be returned
+ *
+ * @deprecated Please use [[Table.offset()]] and [[Table.fetch()]] instead.
*/
+ @deprecated(message = "deprecated in favor of Table.offset() and Table.fetch()", since = "1.4.0")
def limit(offset: Int, fetch: Int): Table = {
new Table(tableEnv, Limit(offset, fetch, logicalPlan).validate(tableEnv))
}
/**
+ * Limits a sorted result from an offset position.
+ * Similar to a SQL OFFSET clause. Offset is technically part of the Order By operator and
+ * thus must be preceded by it.
+ *
+ * [[Table.offset(o)]] can be combined with a subsequent [[Table.fetch(n)]] call to return the
+ * first n rows starting from the offset position o.
--- End diff --
Maybe you can add a notice about the SQL indices here, as it might confuses users. Or you add `skips 3 rows and begins with the 4th record`.
---
[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/4813#discussion_r146490303
--- Diff: docs/dev/table/tableApi.md ---
@@ -985,19 +985,22 @@ Table result = in.orderBy("a.asc");
<tr>
<td>
- <strong>Limit</strong><br>
+ <strong>Offset & Fetch</strong><br>
<span class="label label-primary">Batch</span>
</td>
<td>
- <p>Similar to a SQL LIMIT clause. Limits a sorted result to a specified number of records from an offset position. Limit is technically part of the Order By operator and thus must be preceded by it.</p>
+ <p>Similar to the SQL OFFSET and FETCH clauses. Offset and Fetch limit the number of records returned from a sorted result. Offset and Fetch are technically part of the Order By operator and thus must be preceded by it.</p>
{% highlight java %}
Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3); // returns unlimited number of records beginning with the 4th record
-{% endhighlight %}
-or
-{% highlight java %}
-Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning with the 4th record
+
+// returns the first 5 records from the sorted result
+Table result1 = in.orderBy("a.asc").fetch(5);
+
+// returns all records beginning with the 4th record from the sorted result
+Table result2 = in.orderBy("a.asc").offset(3);
+
+// returns the first 5 records beginning with the 10th record from the sorted result
--- End diff --
No it must be 10th, because SQL indices start at 1.
---
[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4813#discussion_r146542343
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---
@@ -745,12 +748,65 @@ class Table(
*
* @param offset number of records to skip
* @param fetch number of records to be returned
+ *
+ * @deprecated Please use [[Table.offset()]] and [[Table.fetch()]] instead.
*/
+ @deprecated(message = "deprecated in favor of Table.offset() and Table.fetch()", since = "1.4.0")
def limit(offset: Int, fetch: Int): Table = {
new Table(tableEnv, Limit(offset, fetch, logicalPlan).validate(tableEnv))
}
/**
+ * Limits a sorted result from an offset position.
+ * Similar to a SQL OFFSET clause. Offset is technically part of the Order By operator and
+ * thus must be preceded by it.
+ *
+ * [[Table.offset(o)]] can be combined with a subsequent [[Table.fetch(n)]] call to return the
+ * first n rows starting from the offset position o.
+ *
+ * {{{
+ * // returns unlimited number of records beginning with the 4th record
+ * tab.orderBy('name.desc).offset(3)
+ * // return the first 5 records starting from the 10th record
+ * tab.orderBy('name.desc).offset(10).fetch(5)
+ * }}}
+ *
+ * @param offset number of records to skip
+ */
+ def offset(offset: Int): Table = {
+ new Table(tableEnv, Limit(offset, -1, logicalPlan).validate(tableEnv))
+ }
+
+ /**
+ * Limits a sorted result to the first n rows.
+ * Similar to a SQL FETCH clause. Fetch is technically part of the Order By operator and
+ * thus must be preceded by it.
+ *
+ * [[Table.fetch(n)]] can be combined with a preceding [[Table.offset(o)]] call to return the
+ * first n rows starting from the offset position o.
+ *
+ * {{{
+ * // returns the first 3 records.
+ * tab.orderBy('name.desc).fetch(3)
+ * // return the first 5 records starting from the 10th record
+ * tab.orderBy('name.desc).offset(10).fetch(5)
+ * }}}
+ *
+ * @param fetch the number of records to return
+ */
+ def fetch(fetch: Int): Table = {
--- End diff --
Good point, thanks @xccui
---
[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4813#discussion_r146605323
--- Diff: docs/dev/table/tableApi.md ---
@@ -985,19 +985,22 @@ Table result = in.orderBy("a.asc");
<tr>
<td>
- <strong>Limit</strong><br>
+ <strong>Offset & Fetch</strong><br>
<span class="label label-primary">Batch</span>
</td>
<td>
- <p>Similar to a SQL LIMIT clause. Limits a sorted result to a specified number of records from an offset position. Limit is technically part of the Order By operator and thus must be preceded by it.</p>
+ <p>Similar to the SQL OFFSET and FETCH clauses. Offset and Fetch limit the number of records returned from a sorted result. Offset and Fetch are technically part of the Order By operator and thus must be preceded by it.</p>
{% highlight java %}
Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3); // returns unlimited number of records beginning with the 4th record
-{% endhighlight %}
-or
-{% highlight java %}
-Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning with the 4th record
+
+// returns the first 5 records from the sorted result
+Table result1 = in.orderBy("a.asc").fetch(5);
+
+// returns all records beginning with the 4th record from the sorted result
+Table result2 = in.orderBy("a.asc").offset(3);
+
+// returns the first 5 records beginning with the 10th record from the sorted result
--- End diff --
I think @xccui is right. An offset of 10 means that the first 10 rows are omitted from the result.
Otherwise offset 1 would not offset.
I'll update the docs.
---