You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by fhueske <gi...@git.apache.org> on 2017/07/04 21:28:54 UTC

[GitHub] flink pull request #4256: [FLINK-6747] [docs] Add documentation for QueryCon...

GitHub user fhueske opened a pull request:

    https://github.com/apache/flink/pull/4256

    [FLINK-6747] [docs] Add documentation for QueryConfig.

    Needs to be merge to `master` and `release-1.3`.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/fhueske/flink tableStreamDocs

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4256.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4256
    
----
commit c5775d998d89ad86895e339466aecd47e112c6dd
Author: Fabian Hueske <fh...@apache.org>
Date:   2017-07-04T21:28:22Z

    [FLINK-6747] [docs] Add documentation for QueryConfig.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4256: [FLINK-6747] [docs] Add documentation for QueryCon...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4256#discussion_r125539485
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -351,13 +351,109 @@ val windowedTable = tEnv
     Query Configuration
     -------------------
     
    -In stream processing, compuations are constantly happening and there are many use cases that require to update previously emitted results. There are many ways in which a query can compute and emit updates. These do not affect the semantics of the query but might lead to approximated results. 
    +Table API and SQL queries have the same semantics regardless whether their input is bounded batch input or unbounded stream input. In many cases, continuous queries on streaming input are capable of computing accurate results that are identical to offline computed results. However, this is not possible in general case because continuous queries have to restrict the size of state they maintain in order to avoid to run out of storage and to be able to process unbounded streaming data over a long period of time. Consequently, a continuous query might only be able to provide approximated results depending on the characteristics of the input data and the query itself.
     
    -Flink's Table API and SQL interface use a `QueryConfig` to control the computation and emission of results and updates.
    +Flink's Table API and SQL interface provide parameters to tune the accuracy and resource consumption of continuous queries. The parameters are specified via a `QueryConfig` object. The `QueryConfig` can be obtained from the `TableEnvironment` and is passed back when a `Table` is translated, i.e., when it is [transformed into a DataStream](common.html#convert-a-table-into-a-datastream-or-dataset) or [emitted via a TableSink](common.html#emit-a-table).
     
    -### State Retention
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    +
    +// obtain query configuration from TableEnvironment
    +StreamQueryConfig qConfig = tableEnv.queryConfig();
    +// set query parameters
    +qConfig.withIdleStateRetentionTime(Time.hours(12));
    +...
    +
    +// define query
    +Table result = ...
    +
    +// emit result Table via a TableSink
    +result.writeToSink(sink, qConfig);
    --- End diff --
    
    Do we need add `TableSink<Row> sink = ...` just a suggest.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4256: [FLINK-6747] [docs] Add documentation for QueryCon...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4256#discussion_r125540625
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -351,13 +351,109 @@ val windowedTable = tEnv
     Query Configuration
     -------------------
     
    -In stream processing, compuations are constantly happening and there are many use cases that require to update previously emitted results. There are many ways in which a query can compute and emit updates. These do not affect the semantics of the query but might lead to approximated results. 
    +Table API and SQL queries have the same semantics regardless whether their input is bounded batch input or unbounded stream input. In many cases, continuous queries on streaming input are capable of computing accurate results that are identical to offline computed results. However, this is not possible in general case because continuous queries have to restrict the size of state they maintain in order to avoid to run out of storage and to be able to process unbounded streaming data over a long period of time. Consequently, a continuous query might only be able to provide approximated results depending on the characteristics of the input data and the query itself.
     
    -Flink's Table API and SQL interface use a `QueryConfig` to control the computation and emission of results and updates.
    +Flink's Table API and SQL interface provide parameters to tune the accuracy and resource consumption of continuous queries. The parameters are specified via a `QueryConfig` object. The `QueryConfig` can be obtained from the `TableEnvironment` and is passed back when a `Table` is translated, i.e., when it is [transformed into a DataStream](common.html#convert-a-table-into-a-datastream-or-dataset) or [emitted via a TableSink](common.html#emit-a-table).
     
    -### State Retention
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    +
    +// obtain query configuration from TableEnvironment
    +StreamQueryConfig qConfig = tableEnv.queryConfig();
    +// set query parameters
    +qConfig.withIdleStateRetentionTime(Time.hours(12));
    +...
    --- End diff --
    
    I think add line break look more comfortable,but i am fine to keep current style.(same as bellow)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4256: [FLINK-6747] [docs] Add documentation for QueryCon...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4256#discussion_r125553376
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -351,13 +351,109 @@ val windowedTable = tEnv
     Query Configuration
     -------------------
     
    -In stream processing, compuations are constantly happening and there are many use cases that require to update previously emitted results. There are many ways in which a query can compute and emit updates. These do not affect the semantics of the query but might lead to approximated results. 
    +Table API and SQL queries have the same semantics regardless whether their input is bounded batch input or unbounded stream input. In many cases, continuous queries on streaming input are capable of computing accurate results that are identical to offline computed results. However, this is not possible in general case because continuous queries have to restrict the size of state they maintain in order to avoid to run out of storage and to be able to process unbounded streaming data over a long period of time. Consequently, a continuous query might only be able to provide approximated results depending on the characteristics of the input data and the query itself.
     
    -Flink's Table API and SQL interface use a `QueryConfig` to control the computation and emission of results and updates.
    +Flink's Table API and SQL interface provide parameters to tune the accuracy and resource consumption of continuous queries. The parameters are specified via a `QueryConfig` object. The `QueryConfig` can be obtained from the `TableEnvironment` and is passed back when a `Table` is translated, i.e., when it is [transformed into a DataStream](common.html#convert-a-table-into-a-datastream-or-dataset) or [emitted via a TableSink](common.html#emit-a-table).
     
    -### State Retention
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    +
    +// obtain query configuration from TableEnvironment
    +StreamQueryConfig qConfig = tableEnv.queryConfig();
    +// set query parameters
    +qConfig.withIdleStateRetentionTime(Time.hours(12));
    +...
    +
    +// define query
    +Table result = ...
    +
    +// emit result Table via a TableSink
    +result.writeToSink(sink, qConfig);
    +
    +// convert result Table into a DataStream<Row>
    +DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig);
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +val tableEnv = TableEnvironment.getTableEnvironment(env)
    +
    +// obtain query configuration from TableEnvironment
    +val qConfig: StreamQueryConfig = tableEnv.queryConfig
    +// set query parameters
    +qConfig.withIdleStateRetentionTime(Time.hours(12))
    +...
    +
    +// define query
    +val result: Table = ???
    +
    +// emit result Table via a TableSink
    +result.writeToSink(sink, qConfig)
    +
    +// convert result Table into a DataStream
    +val stream: DataStream[Row] = result.toAppendStream[Row](qConfig)
    +
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +In the the following we describe the parameters of the `QueryConfig` and how they affect the accuracy and resource consumption of a query.
    +
    +### Idle State Retention Time
    +
    +Many queries aggregate or join records on one or more key attributes. When such a query is executed on a stream, the resulting continuous query needs to collect records or maintain partial results per key. If the key domain of the input stream is evolving, i.e., the active key values are changing over time, the continuous query accumulates more and more state as distinct keys are observed. However, often keys become inactive after some time and their corresponding state becomes stale and useless.
    +
    +For example the following query computes the number of clicks per session.
    +
    +```
    +SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
    +```
    +
    +The `sessionId` attribute is used as a grouping key and the continuous query maintains a count for each session it observes. The `sessionId` attribute is evolving over time and `sessionId` values are only active until the session ends, i.e., for a limited period of time. However, the continuous query cannot know about this property of `sessionId` and has to expect that any `sessionId` value can occur at any time. Therefore, it maintains the current count for each observed `sessionId` value. Consequently, the total state size of the query is continuously growing as more and more `sessionId` values are observed. 
    +
    +The *Idle State Retention Time* defines for how long the state of a key may not be updated before it is removed. For the previous example query this specifies the time for how long the count of a `seesionId` may not be updated before it is removed. 
    --- End diff --
    
    The *Idle State Retention Time* defines for how long the state of a key will be retented without any update before it is removed
    For the previous example query, the count of a `seesionId` will be removed if it has not been updated for a period of this *Retention Time*.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4256: [FLINK-6747] [docs] Add documentation for QueryConfig.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/4256
  
    Thanks for the review @sunjincheng121!
    
    I addressed your points and will merge the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4256: [FLINK-6747] [docs] Add documentation for QueryCon...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4256#discussion_r125538537
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -351,13 +351,109 @@ val windowedTable = tEnv
     Query Configuration
     -------------------
     
    -In stream processing, compuations are constantly happening and there are many use cases that require to update previously emitted results. There are many ways in which a query can compute and emit updates. These do not affect the semantics of the query but might lead to approximated results. 
    +Table API and SQL queries have the same semantics regardless whether their input is bounded batch input or unbounded stream input. In many cases, continuous queries on streaming input are capable of computing accurate results that are identical to offline computed results. However, this is not possible in general case because continuous queries have to restrict the size of state they maintain in order to avoid to run out of storage and to be able to process unbounded streaming data over a long period of time. Consequently, a continuous query might only be able to provide approximated results depending on the characteristics of the input data and the query itself.
     
    -Flink's Table API and SQL interface use a `QueryConfig` to control the computation and emission of results and updates.
    +Flink's Table API and SQL interface provide parameters to tune the accuracy and resource consumption of continuous queries. The parameters are specified via a `QueryConfig` object. The `QueryConfig` can be obtained from the `TableEnvironment` and is passed back when a `Table` is translated, i.e., when it is [transformed into a DataStream](common.html#convert-a-table-into-a-datastream-or-dataset) or [emitted via a TableSink](common.html#emit-a-table).
     
    -### State Retention
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    +
    +// obtain query configuration from TableEnvironment
    +StreamQueryConfig qConfig = tableEnv.queryConfig();
    +// set query parameters
    +qConfig.withIdleStateRetentionTime(Time.hours(12));
    +...
    +
    +// define query
    +Table result = ...
    +
    +// emit result Table via a TableSink
    +result.writeToSink(sink, qConfig);
    +
    +// convert result Table into a DataStream<Row>
    +DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig);
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +val tableEnv = TableEnvironment.getTableEnvironment(env)
    +
    +// obtain query configuration from TableEnvironment
    +val qConfig: StreamQueryConfig = tableEnv.queryConfig
    +// set query parameters
    +qConfig.withIdleStateRetentionTime(Time.hours(12))
    +...
    +
    +// define query
    +val result: Table = ???
    +
    +// emit result Table via a TableSink
    +result.writeToSink(sink, qConfig)
    +
    +// convert result Table into a DataStream
    --- End diff --
    
    DataStream -> DataStream[Row] ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4256: [FLINK-6747] [docs] Add documentation for QueryCon...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4256


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4256: [FLINK-6747] [docs] Add documentation for QueryCon...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4256#discussion_r125539538
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -351,13 +351,109 @@ val windowedTable = tEnv
     Query Configuration
     -------------------
     
    -In stream processing, compuations are constantly happening and there are many use cases that require to update previously emitted results. There are many ways in which a query can compute and emit updates. These do not affect the semantics of the query but might lead to approximated results. 
    +Table API and SQL queries have the same semantics regardless whether their input is bounded batch input or unbounded stream input. In many cases, continuous queries on streaming input are capable of computing accurate results that are identical to offline computed results. However, this is not possible in general case because continuous queries have to restrict the size of state they maintain in order to avoid to run out of storage and to be able to process unbounded streaming data over a long period of time. Consequently, a continuous query might only be able to provide approximated results depending on the characteristics of the input data and the query itself.
     
    -Flink's Table API and SQL interface use a `QueryConfig` to control the computation and emission of results and updates.
    +Flink's Table API and SQL interface provide parameters to tune the accuracy and resource consumption of continuous queries. The parameters are specified via a `QueryConfig` object. The `QueryConfig` can be obtained from the `TableEnvironment` and is passed back when a `Table` is translated, i.e., when it is [transformed into a DataStream](common.html#convert-a-table-into-a-datastream-or-dataset) or [emitted via a TableSink](common.html#emit-a-table).
     
    -### State Retention
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    +
    +// obtain query configuration from TableEnvironment
    +StreamQueryConfig qConfig = tableEnv.queryConfig();
    +// set query parameters
    +qConfig.withIdleStateRetentionTime(Time.hours(12));
    +...
    +
    +// define query
    +Table result = ...
    +
    +// emit result Table via a TableSink
    +result.writeToSink(sink, qConfig);
    +
    +// convert result Table into a DataStream<Row>
    +DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig);
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +val tableEnv = TableEnvironment.getTableEnvironment(env)
    +
    +// obtain query configuration from TableEnvironment
    +val qConfig: StreamQueryConfig = tableEnv.queryConfig
    +// set query parameters
    +qConfig.withIdleStateRetentionTime(Time.hours(12))
    +...
    +
    +// define query
    +val result: Table = ???
    +
    +// emit result Table via a TableSink
    +result.writeToSink(sink, qConfig)
    --- End diff --
    
    Add a `sink: TableSink[Row] = ???`  just a suggest.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---