You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/09/03 12:00:21 UTC

[flink] branch master updated: [FLINK-13356][table][docs] Add documentation for TopN and Deduplication in blink planner

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ec4c0c3  [FLINK-13356][table][docs] Add documentation for TopN and Deduplication in blink planner
ec4c0c3 is described below

commit ec4c0c35a55253ee4776c88889359fc682d2feae
Author: Jark Wu <im...@gmail.com>
AuthorDate: Thu Aug 22 20:00:28 2019 +0800

    [FLINK-13356][table][docs] Add documentation for TopN and Deduplication in blink planner
    
    This closes #9511
---
 docs/dev/table/sql.md    | 223 ++++++++++++++++++++++++++++++++++++++++++++++-
 docs/dev/table/sql.zh.md | 221 ++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 443 insertions(+), 1 deletion(-)

diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 0cfbdda..c3d0cd3 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -24,7 +24,7 @@ under the License.
 
 This is a complete list of Data Definition Language (DDL) and Data Manipulation Language (DML) constructs supported in Flink.
 * This will be replaced by the TOC
-{:toc} 
+{:toc}
 
 ## Query
 SQL queries are specified with the `sqlQuery()` method of the `TableEnvironment`. The method returns the result of the SQL query as a `Table`. A `Table` can be used in [subsequent SQL and Table API queries](common.html#mixing-table-api-and-sql), be [converted into a DataSet or DataStream](common.html#integration-with-datastream-and-dataset-api), or [written to a TableSink](common.html#emit-a-table)). SQL and Table API queries can be seamlessly mixed and are holistically optimized and tra [...]
@@ -835,6 +835,227 @@ LIMIT 3
 
 {% top %}
 
+#### Top-N
+
+<span class="label label-danger">Attention</span> Top-N is only supported in Blink planner.
+
+Top-N queries ask for the N smallest or largest values ordered by columns. Both smallest and largest values sets are considered Top-N queries. Top-N queries are useful in cases where the need is to display only the N bottom-most or the N top-
+most records from batch/streaming table on a condition. This result set can be used for further analysis.
+
+Flink uses the combination of a OVER window clause and a filter condition to express a Top-N query. With the power of OVER window `PARTITION BY` clause, Flink also supports per group Top-N. For example, the top five products per category that have the maximum sales in realtime. Top-N queries are supported for SQL on batch and streaming tables.
+
+The following shows the syntax of the TOP-N statement:
+
+{% highlight sql %}
+SELECT [column_list]
+FROM (
+   SELECT [column_list],
+     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
+       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
+   FROM table_name)
+WHERE rownum <= N [AND conditions]
+{% endhighlight %}
+
+**Parameter Specification:**
+
+- `ROW_NUMBER()`: Assigns an unique, sequential number to each row, starting with one, according to the ordering of rows within the partition. Currently, we only support `ROW_NUMBER` as the over window function. In the future, we will support `RANK()` and `DENSE_RANK()`.
+- `PARTITION BY col1[, col2...]`: Specifies the partition columns. Each partition will have a Top-N result.
+- `ORDER BY col1 [asc|desc][, col2 [asc|desc]...]`: Specifies the ordering columns. The ordering directions can be different on different columns.
+- `WHERE rownum <= N`: The `rownum <= N` is required for Flink to recognize this query is a Top-N query. The N represents the N smallest or largest records will be retained.
+- `[AND conditions]`: It is free to add other conditions in the where clause, but the other conditions can only be combined with `rownum <= N` using `AND` conjunction.
+
+<span class="label label-danger">Attention in Streaming Mode</span> The TopN query is <span class="label label-info">Result Updating</span>. Flink SQL will sort the input data stream according to the order key, so if the top N records have been changed, the changed ones will be sent as retraction/update records to downstream.
+It is recommended to use a storage which supports updating as the sink of Top-N query. In addition, if the top N records need to be stored in external storage, the result table should have the same unique key with the Top-N query.
+
+The unique keys of Top-N query is the combination of partition columns and rownum column. Top-N query can also derive the unique key of upstream. Take following job as an example, say `product_id` is the unique key of the `ShopSales`, then the unique keys of the Top-N query are [`category`, `rownum`] and [`product_id`].
+
+The following examples show how to specify SQL queries with Top-N on streaming tables. This is an example to get "the top five products per category that have the maximum sales in realtime" we mentioned above.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// ingest a DataStream from an external source
+DataStream<Tuple3<String, String, String, Long>> ds = env.addSource(...);
+// register the DataStream as table "ShopSales"
+tableEnv.registerDataStream("ShopSales", ds, "product_id, category, product_name, sales");
+
+// select top-5 products per category which have the maximum sales.
+Table result1 = tableEnv.sqlQuery(
+  "SELECT * " +
+  "FROM (" +
+  "   SELECT *," +
+  "       ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num" +
+  "   FROM ShopSales)" +
+  "WHERE row_num <= 5");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// read a DataStream from an external source
+val ds: DataStream[(String, String, String, Long)] = env.addSource(...)
+// register the DataStream under the name "ShopSales"
+tableEnv.registerDataStream("ShopSales", ds, 'product_id, 'category, 'product_name, 'sales)
+
+
+// select top-5 products per category which have the maximum sales.
+val result1 = tableEnv.sqlQuery(
+    """
+      |SELECT *
+      |FROM (
+      |   SELECT *,
+      |       ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num
+      |   FROM ShopSales)
+      |WHERE row_num <= 5
+    """.stripMargin)
+{% endhighlight %}
+</div>
+</div>
+
+##### No Ranking Output Optimization
+
+As described above, the `rownum` field will be written into the result table as one field of the unique key, which may lead to a lot of records being written to the result table. For example, when the record (say `product-1001`) of ranking 9 is updated and its rank is upgraded to 1, all the records from ranking 1 ~ 9 will be output to the result table as update messages. If the result table receives too many data, it will become the bottleneck of the SQL job.
+
+The optimization way is omitting rownum field in the outer SELECT clause of the Top-N query. This is reasonable because the number of the top N records is usually not large, thus the consumers can sort the records themselves quickly. Without rownum field, in the example above, only the changed record (`product-1001`) needs to be sent to downstream, which can reduce much IO to the result table.
+
+The following example shows how to optimize the above Top-N example in this way:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// ingest a DataStream from an external source
+DataStream<Tuple3<String, String, String, Long>> ds = env.addSource(...);
+// register the DataStream as table "ShopSales"
+tableEnv.registerDataStream("ShopSales", ds, "product_id, category, product_name, sales");
+
+// select top-5 products per category which have the maximum sales.
+Table result1 = tableEnv.sqlQuery(
+  "SELECT product_id, category, product_name, sales " + // omit row_num field in the output
+  "FROM (" +
+  "   SELECT *," +
+  "       ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num" +
+  "   FROM ShopSales)" +
+  "WHERE row_num <= 5");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// read a DataStream from an external source
+val ds: DataStream[(String, String, String, Long)] = env.addSource(...)
+// register the DataStream under the name "ShopSales"
+tableEnv.registerDataStream("ShopSales", ds, 'product_id, 'category, 'product_name, 'sales)
+
+
+// select top-5 products per category which have the maximum sales.
+val result1 = tableEnv.sqlQuery(
+    """
+      |SELECT product_id, category, product_name, sales  -- omit row_num field in the output
+      |FROM (
+      |   SELECT *,
+      |       ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num
+      |   FROM ShopSales)
+      |WHERE row_num <= 5
+    """.stripMargin)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention in Streaming Mode</span> In order to output the above query to an external storage and have a correct result, the external storage must have the same unique key with the Top-N query. In the above example query, if the `product_id` is the unique key of the query, then the external table should also has `product_id` as the unique key.
+
+{% top %}
+
+#### Deduplication
+
+<span class="label label-danger">Attention</span> Deduplication is only supported in Blink planner.
+
+Deduplication is removing rows that duplicate over a set of columns, keeping only the first one or the last one. In some cases, the upstream ETL jobs are not end-to-end exactly-once, this may result in there are duplicate records in the sink in case of failover. However, the duplicate records will affect the correctness of downstream analytical jobs (e.g. `SUM`, `COUNT`). So a deduplication is needed before further analysis.
+
+Flink uses `ROW_NUMBER()` to remove duplicates just like the way of Top-N query. In theory, deduplication is a special case of Top-N which the N is one and order by the processing time or event time.
+
+The following shows the syntax of the Deduplication statement:
+
+{% highlight sql %}
+SELECT [column_list]
+FROM (
+   SELECT [column_list],
+     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
+       ORDER BY time_attr [asc|desc]) AS rownum
+   FROM table_name)
+WHERE rownum = 1
+{% endhighlight %}
+
+**Parameter Specification:**
+
+- `ROW_NUMBER()`: Assigns an unique, sequential number to each row, starting with one.
+- `PARTITION BY col1[, col2...]`: Specifies the partition columns, i.e. the deduplicate key.
+- `ORDER BY time_attr [asc|desc]`: Specifies the ordering column, it must be a [time attribute](streaming/time_attributes.html). Currently only support [proctime attribute](streaming/time_attributes.html#processing-time). [Rowtime atttribute](streaming/time_attributes.html#event-time) will be supported in the future. Ordering by ASC means keeping the first row, ordering by DESC means keeping the last row.
+- `WHERE rownum = 1`: The `rownum = 1` is required for Flink to recognize this query is deduplication.
+
+The following examples show how to specify SQL queries with Deduplication on streaming tables.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// ingest a DataStream from an external source
+DataStream<Tuple3<String, String, String, Integer>> ds = env.addSource(...);
+// register the DataStream as table "Orders"
+tableEnv.registerDataStream("Orders", ds, "order_id, user, product, number, proctime.proctime");
+
+// remove duplicate rows on order_id and keep the first occurrence row,
+// because there shouldn't be two orders with the same order_id.
+Table result1 = tableEnv.sqlQuery(
+  "SELECT order_id, user, product, number " +
+  "FROM (" +
+  "   SELECT *," +
+  "       ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime ASC) as row_num" +
+  "   FROM Orders)" +
+  "WHERE row_num = 1");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// read a DataStream from an external source
+val ds: DataStream[(String, String, String, Int)] = env.addSource(...)
+// register the DataStream under the name "Orders"
+tableEnv.registerDataStream("Orders", ds, 'order_id, 'user, 'product, 'number, 'proctime.proctime)
+
+// remove duplicate rows on order_id and keep the first occurrence row,
+// because there shouldn't be two orders with the same order_id.
+val result1 = tableEnv.sqlQuery(
+    """
+      |SELECT order_id, user, product, number
+      |FROM (
+      |   SELECT *,
+      |       ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime DESC) as row_num
+      |   FROM Orders)
+      |WHERE row_num = 1
+    """.stripMargin)
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
 #### Insert
 
 <div markdown="1">
diff --git a/docs/dev/table/sql.zh.md b/docs/dev/table/sql.zh.md
index dd3869e..642402a 100644
--- a/docs/dev/table/sql.zh.md
+++ b/docs/dev/table/sql.zh.md
@@ -835,6 +835,227 @@ LIMIT 3
 
 {% top %}
 
+#### Top-N
+
+<span class="label label-danger">Attention</span> Top-N is only supported in Blink planner.
+
+Top-N queries ask for the N smallest or largest values ordered by columns. Both smallest and largest values sets are considered Top-N queries. Top-N queries are useful in cases where the need is to display only the N bottom-most or the N top-
+most records from batch/streaming table on a condition. This result set can be used for further analysis.
+
+Flink uses the combination of a OVER window clause and a filter condition to express a Top-N query. With the power of OVER window `PARTITION BY` clause, Flink also supports per group Top-N. For example, the top five products per category that have the maximum sales in realtime. Top-N queries are supported for SQL on batch and streaming tables.
+
+The following shows the syntax of the TOP-N statement:
+
+{% highlight sql %}
+SELECT [column_list]
+FROM (
+   SELECT [column_list],
+     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
+       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
+   FROM table_name)
+WHERE rownum <= N [AND conditions]
+{% endhighlight %}
+
+**Parameter Specification:**
+
+- `ROW_NUMBER()`: Assigns an unique, sequential number to each row, starting with one, according to the ordering of rows within the partition. Currently, we only support `ROW_NUMBER` as the over window function. In the future, we will support `RANK()` and `DENSE_RANK()`.
+- `PARTITION BY col1[, col2...]`: Specifies the partition columns. Each partition will have a Top-N result.
+- `ORDER BY col1 [asc|desc][, col2 [asc|desc]...]`: Specifies the ordering columns. The ordering directions can be different on different columns.
+- `WHERE rownum <= N`: The `rownum <= N` is required for Flink to recognize this query is a Top-N query. The N represents the N smallest or largest records will be retained.
+- `[AND conditions]`: It is free to add other conditions in the where clause, but the other conditions can only be combined with `rownum <= N` using `AND` conjunction.
+
+<span class="label label-danger">Attention in Streaming Mode</span> The TopN query is <span class="label label-info">Result Updating</span>. Flink SQL will sort the input data stream according to the order key, so if the top N records have been changed, the changed ones will be sent as retraction/update records to downstream.
+It is recommended to use a storage which supports updating as the sink of Top-N query. In addition, if the top N records need to be stored in external storage, the result table should have the same unique key with the Top-N query.
+
+The unique keys of Top-N query is the combination of partition columns and rownum column. Top-N query can also derive the unique key of upstream. Take following job as an example, say `product_id` is the unique key of the `ShopSales`, then the unique keys of the Top-N query are [`category`, `rownum`] and [`product_id`].
+
+The following examples show how to specify SQL queries with Top-N on streaming tables. This is an example to get "the top five products per category that have the maximum sales in realtime" we mentioned above.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// ingest a DataStream from an external source
+DataStream<Tuple3<String, String, String, Long>> ds = env.addSource(...);
+// register the DataStream as table "ShopSales"
+tableEnv.registerDataStream("ShopSales", ds, "product_id, category, product_name, sales");
+
+// select top-5 products per category which have the maximum sales.
+Table result1 = tableEnv.sqlQuery(
+  "SELECT * " +
+  "FROM (" +
+  "   SELECT *," +
+  "       ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num" +
+  "   FROM ShopSales)" +
+  "WHERE row_num <= 5");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// read a DataStream from an external source
+val ds: DataStream[(String, String, String, Long)] = env.addSource(...)
+// register the DataStream under the name "ShopSales"
+tableEnv.registerDataStream("ShopSales", ds, 'product_id, 'category, 'product_name, 'sales)
+
+
+// select top-5 products per category which have the maximum sales.
+val result1 = tableEnv.sqlQuery(
+    """
+      |SELECT *
+      |FROM (
+      |   SELECT *,
+      |       ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num
+      |   FROM ShopSales)
+      |WHERE row_num <= 5
+    """.stripMargin)
+{% endhighlight %}
+</div>
+</div>
+
+##### No Ranking Output Optimization
+
+As described above, the `rownum` field will be written into the result table as one field of the unique key, which may lead to a lot of records being written to the result table. For example, when the record (say `product-1001`) of ranking 9 is updated and its rank is upgraded to 1, all the records from ranking 1 ~ 9 will be output to the result table as update messages. If the result table receives too many data, it will become the bottleneck of the SQL job.
+
+The optimization way is omitting rownum field in the outer SELECT clause of the Top-N query. This is reasonable because the number of the top N records is usually not large, thus the consumers can sort the records themselves quickly. Without rownum field, in the example above, only the changed record (`product-1001`) needs to be sent to downstream, which can reduce much IO to the result table.
+
+The following example shows how to optimize the above Top-N example in this way:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// ingest a DataStream from an external source
+DataStream<Tuple3<String, String, String, Long>> ds = env.addSource(...);
+// register the DataStream as table "ShopSales"
+tableEnv.registerDataStream("ShopSales", ds, "product_id, category, product_name, sales");
+
+// select top-5 products per category which have the maximum sales.
+Table result1 = tableEnv.sqlQuery(
+  "SELECT product_id, category, product_name, sales " + // omit row_num field in the output
+  "FROM (" +
+  "   SELECT *," +
+  "       ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num" +
+  "   FROM ShopSales)" +
+  "WHERE row_num <= 5");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// read a DataStream from an external source
+val ds: DataStream[(String, String, String, Long)] = env.addSource(...)
+// register the DataStream under the name "ShopSales"
+tableEnv.registerDataStream("ShopSales", ds, 'product_id, 'category, 'product_name, 'sales)
+
+
+// select top-5 products per category which have the maximum sales.
+val result1 = tableEnv.sqlQuery(
+    """
+      |SELECT product_id, category, product_name, sales  -- omit row_num field in the output
+      |FROM (
+      |   SELECT *,
+      |       ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num
+      |   FROM ShopSales)
+      |WHERE row_num <= 5
+    """.stripMargin)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention in Streaming Mode</span> In order to output the above query to an external storage and have a correct result, the external storage must have the same unique key with the Top-N query. In the above example query, if the `product_id` is the unique key of the query, then the external table should also has `product_id` as the unique key.
+
+{% top %}
+
+#### Deduplication
+
+<span class="label label-danger">Attention</span> Deduplication is only supported in Blink planner.
+
+Deduplication is removing rows that duplicate over a set of columns, keeping only the first one or the last one. In some cases, the upstream ETL jobs are not end-to-end exactly-once, this may result in there are duplicate records in the sink in case of failover. However, the duplicate records will affect the correctness of downstream analytical jobs (e.g. `SUM`, `COUNT`). So a deduplication is needed before further analysis.
+
+Flink uses `ROW_NUMBER()` to remove duplicates just like the way of Top-N query. In theory, deduplication is a special case of Top-N which the N is one and order by the processing time or event time.
+
+The following shows the syntax of the Deduplication statement:
+
+{% highlight sql %}
+SELECT [column_list]
+FROM (
+   SELECT [column_list],
+     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
+       ORDER BY time_attr [asc|desc]) AS rownum
+   FROM table_name)
+WHERE rownum = 1
+{% endhighlight %}
+
+**Parameter Specification:**
+
+- `ROW_NUMBER()`: Assigns an unique, sequential number to each row, starting with one.
+- `PARTITION BY col1[, col2...]`: Specifies the partition columns, i.e. the deduplicate key.
+- `ORDER BY time_attr [asc|desc]`: Specifies the ordering column, it must be a [time attribute](streaming/time_attributes.html). Currently only support [proctime attribute](streaming/time_attributes.html#processing-time). [Rowtime atttribute](streaming/time_attributes.html#event-time) will be supported in the future. Ordering by ASC means keeping the first row, ordering by DESC means keeping the last row.
+- `WHERE rownum = 1`: The `rownum = 1` is required for Flink to recognize this query is deduplication.
+
+The following examples show how to specify SQL queries with Deduplication on streaming tables.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// ingest a DataStream from an external source
+DataStream<Tuple3<String, String, String, Integer>> ds = env.addSource(...);
+// register the DataStream as table "Orders"
+tableEnv.registerDataStream("Orders", ds, "order_id, user, product, number, proctime.proctime");
+
+// remove duplicate rows on order_id and keep the first occurrence row,
+// because there shouldn't be two orders with the same order_id.
+Table result1 = tableEnv.sqlQuery(
+  "SELECT order_id, user, product, number " +
+  "FROM (" +
+  "   SELECT *," +
+  "       ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime ASC) as row_num" +
+  "   FROM Orders)" +
+  "WHERE row_num = 1");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// read a DataStream from an external source
+val ds: DataStream[(String, String, String, Int)] = env.addSource(...)
+// register the DataStream under the name "Orders"
+tableEnv.registerDataStream("Orders", ds, 'order_id, 'user, 'product, 'number, 'proctime.proctime)
+
+// remove duplicate rows on order_id and keep the first occurrence row,
+// because there shouldn't be two orders with the same order_id.
+val result1 = tableEnv.sqlQuery(
+    """
+      |SELECT order_id, user, product, number
+      |FROM (
+      |   SELECT *,
+      |       ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime DESC) as row_num
+      |   FROM Orders)
+      |WHERE row_num = 1
+    """.stripMargin)
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
 #### Insert
 
 <div markdown="1">