You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/15 08:27:32 UTC

[GitHub] [flink] twalthr commented on a change in pull request #14594: [FLINK-20551][docs] Make SQL documentation Blink only

twalthr commented on a change in pull request #14594:
URL: https://github.com/apache/flink/pull/14594#discussion_r558006245



##########
File path: docs/dev/table/common.md
##########
@@ -116,173 +106,119 @@ table_result...
 </div>
 </div>
 
-**Note:** Table API and SQL queries can be easily integrated with and embedded into DataStream or DataSet programs. Have a look at the [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api) section to learn how DataStreams and DataSets can be converted into Tables and vice versa.
+**Note:** Table API and SQL queries can be easily integrated with and embedded into DataStream programs.
+Have a look at the [Integration with DataStream](#integration-with-datastream) section to learn how DataStreams can be converted into Tables and vice versa.
 
 {% top %}
 
 Create a TableEnvironment
 -------------------------
 
-The `TableEnvironment` is a central concept of the Table API and SQL integration. It is responsible for:
+The `TableEnvironment` is the entrypoint for Table API and SQL integration and is responsible for:
 
 * Registering a `Table` in the internal catalog
 * Registering catalogs
 * Loading pluggable modules
 * Executing SQL queries
 * Registering a user-defined (scalar, table, or aggregation) function
-* Converting a `DataStream` or `DataSet` into a `Table`
-* Holding a reference to an `ExecutionEnvironment` or `StreamExecutionEnvironment`
-
-A `Table` is always bound to a specific `TableEnvironment`. It is not possible to combine tables of different TableEnvironments in the same query, e.g., to join or union them.
-
-A `TableEnvironment` is created by calling the static `BatchTableEnvironment.create()` or `StreamTableEnvironment.create()` method with a `StreamExecutionEnvironment` or an `ExecutionEnvironment` and an optional `TableConfig`. The `TableConfig` can be used to configure the `TableEnvironment` or to customize the query optimization and translation process (see [Query Optimization](#query-optimization)).
+* Converting a `DataStream` into a `Table`
+* Holding a reference to a `StreamExecutionEnvironment`
 
-Make sure to choose the specific planner `BatchTableEnvironment`/`StreamTableEnvironment` that matches your programming language.
-
-If both planner jars are on the classpath (the default behavior), you should explicitly set which planner to use in the current program.
+A `Table` is always bound to a specific `TableEnvironment`.
+It is not possible to combine tables of different TableEnvironments in the same query, e.g., to join or union them.
+A `TableEnvironment` is created by calling the static `TableEnvironment.create()` method.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-
-// **********************
-// FLINK STREAMING QUERY
-// **********************
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
 
-EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
-StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
-// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
+EnvironmentSettings settings = EnvironmentSettings
+    .newInstance()
+    .inStreamingMode()
+    //.inBatchMode()
+    .build();
 
-// ******************
-// FLINK BATCH QUERY
-// ******************
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
+TableEnvironment tEnv = TableEnvironment.create(setting);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
 
-ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
-BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
+val settings = EnvironmentSettings
+    .newInstance()
+    .inStreamingMode()
+    //.inBatchMode()
+    .build()
 
-// **********************
-// BLINK STREAMING QUERY
-// **********************
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+val tEnv = TableEnvironment.create(setting)
+{% endhighlight %}
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# ***************
+# Streaming QUERY
+# ***************
 
-StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
-StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
-// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import StreamTableEnvironment, EnvironmentSettings
 
-// ******************
-// BLINK BATCH QUERY
-// ******************
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.TableEnvironment;
+b_s_env = StreamExecutionEnvironment.get_execution_environment()
+b_s_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()

Review comment:
       remove `use_blink_planner()` everwhere and also adjust the naming `b_s_env`, `b_b_t_env`?

##########
File path: docs/dev/table/common.md
##########
@@ -116,173 +106,119 @@ table_result...
 </div>
 </div>
 
-**Note:** Table API and SQL queries can be easily integrated with and embedded into DataStream or DataSet programs. Have a look at the [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api) section to learn how DataStreams and DataSets can be converted into Tables and vice versa.
+**Note:** Table API and SQL queries can be easily integrated with and embedded into DataStream programs.
+Have a look at the [Integration with DataStream](#integration-with-datastream) section to learn how DataStreams can be converted into Tables and vice versa.
 
 {% top %}
 
 Create a TableEnvironment
 -------------------------
 
-The `TableEnvironment` is a central concept of the Table API and SQL integration. It is responsible for:
+The `TableEnvironment` is the entrypoint for Table API and SQL integration and is responsible for:
 
 * Registering a `Table` in the internal catalog
 * Registering catalogs
 * Loading pluggable modules
 * Executing SQL queries
 * Registering a user-defined (scalar, table, or aggregation) function
-* Converting a `DataStream` or `DataSet` into a `Table`
-* Holding a reference to an `ExecutionEnvironment` or `StreamExecutionEnvironment`
-
-A `Table` is always bound to a specific `TableEnvironment`. It is not possible to combine tables of different TableEnvironments in the same query, e.g., to join or union them.
-
-A `TableEnvironment` is created by calling the static `BatchTableEnvironment.create()` or `StreamTableEnvironment.create()` method with a `StreamExecutionEnvironment` or an `ExecutionEnvironment` and an optional `TableConfig`. The `TableConfig` can be used to configure the `TableEnvironment` or to customize the query optimization and translation process (see [Query Optimization](#query-optimization)).
+* Converting a `DataStream` into a `Table`
+* Holding a reference to a `StreamExecutionEnvironment`

Review comment:
       "In case of `StreamTableEnvironment` holding a reference to a `StreamExecutionEnvironment`"

##########
File path: docs/dev/table/common.md
##########
@@ -1185,43 +1037,48 @@ val table: Table = tableEnv.fromDataStream(stream, $"_2" as "myInt", $"_1" as "m
 
 #### Atomic Types
 
-Flink treats primitives (`Integer`, `Double`, `String`) or generic types (types that cannot be analyzed and decomposed) as atomic types. A `DataStream` or `DataSet` of an atomic type is converted into a `Table` with a single attribute. The type of the attribute is inferred from the atomic type and the name of the attribute can be specified.
+Flink treats primitives (`Integer`, `Double`, `String`) or generic types (types that cannot be analyzed and decomposed) as atomic types.
+A `DataStream` of an atomic type is converted into a `Table` with a single column.
+The type of the column is inferred from the atomic type and the name of the column can be specified.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
+StreamTableEnvironment tableEnv = ...;
 
 DataStream<Long> stream = ...
 
-// convert DataStream into Table with default field name "f0"
+// Convert DataStream into Table with default field name "f0"
 Table table = tableEnv.fromDataStream(stream);
 
-// convert DataStream into Table with field name "myLong"
+// Convert DataStream into Table with field name "myLong"
 Table table = tableEnv.fromDataStream(stream, $("myLong"));
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-// get a TableEnvironment
-val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
+val tableEnv: StreamTableEnvironment = ???
 
 val stream: DataStream[Long] = ...
 
-// convert DataStream into Table with default field name "f0"
+// Convert DataStream into Table with default field name "f0"

Review comment:
       side comment: it's funny how taste differs, I prefer lower case for inline comments :D

##########
File path: docs/dev/table/common.md
##########
@@ -116,173 +106,119 @@ table_result...
 </div>
 </div>
 
-**Note:** Table API and SQL queries can be easily integrated with and embedded into DataStream or DataSet programs. Have a look at the [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api) section to learn how DataStreams and DataSets can be converted into Tables and vice versa.
+**Note:** Table API and SQL queries can be easily integrated with and embedded into DataStream programs.
+Have a look at the [Integration with DataStream](#integration-with-datastream) section to learn how DataStreams can be converted into Tables and vice versa.
 
 {% top %}
 
 Create a TableEnvironment
 -------------------------
 
-The `TableEnvironment` is a central concept of the Table API and SQL integration. It is responsible for:
+The `TableEnvironment` is the entrypoint for Table API and SQL integration and is responsible for:
 
 * Registering a `Table` in the internal catalog
 * Registering catalogs
 * Loading pluggable modules
 * Executing SQL queries
 * Registering a user-defined (scalar, table, or aggregation) function
-* Converting a `DataStream` or `DataSet` into a `Table`
-* Holding a reference to an `ExecutionEnvironment` or `StreamExecutionEnvironment`
-
-A `Table` is always bound to a specific `TableEnvironment`. It is not possible to combine tables of different TableEnvironments in the same query, e.g., to join or union them.
-
-A `TableEnvironment` is created by calling the static `BatchTableEnvironment.create()` or `StreamTableEnvironment.create()` method with a `StreamExecutionEnvironment` or an `ExecutionEnvironment` and an optional `TableConfig`. The `TableConfig` can be used to configure the `TableEnvironment` or to customize the query optimization and translation process (see [Query Optimization](#query-optimization)).
+* Converting a `DataStream` into a `Table`
+* Holding a reference to a `StreamExecutionEnvironment`
 
-Make sure to choose the specific planner `BatchTableEnvironment`/`StreamTableEnvironment` that matches your programming language.
-
-If both planner jars are on the classpath (the default behavior), you should explicitly set which planner to use in the current program.
+A `Table` is always bound to a specific `TableEnvironment`.
+It is not possible to combine tables of different TableEnvironments in the same query, e.g., to join or union them.
+A `TableEnvironment` is created by calling the static `TableEnvironment.create()` method.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-
-// **********************
-// FLINK STREAMING QUERY
-// **********************
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
 
-EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
-StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
-// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
+EnvironmentSettings settings = EnvironmentSettings
+    .newInstance()
+    .inStreamingMode()
+    //.inBatchMode()
+    .build();
 
-// ******************
-// FLINK BATCH QUERY
-// ******************
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
+TableEnvironment tEnv = TableEnvironment.create(setting);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
 
-ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
-BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
+val settings = EnvironmentSettings
+    .newInstance()
+    .inStreamingMode()
+    //.inBatchMode()
+    .build()
 
-// **********************
-// BLINK STREAMING QUERY
-// **********************
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+val tEnv = TableEnvironment.create(setting)
+{% endhighlight %}
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# ***************
+# Streaming QUERY
+# ***************
 
-StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
-StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
-// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import StreamTableEnvironment, EnvironmentSettings
 
-// ******************
-// BLINK BATCH QUERY
-// ******************
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.TableEnvironment;
+b_s_env = StreamExecutionEnvironment.get_execution_environment()
+b_s_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
+b_s_t_env = StreamTableEnvironment.create(b_s_env, environment_settings=b_s_settings)
 
-EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
-TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
+# ***********
+# BATCH QUERY
+# ***********
 
+from pyflink.table import EnvironmentSettings, BatchTableEnvironment
+
+b_b_settings = EnvironmentSettings.new_instance().use_blink_planner().in_batch_mode().build()
+b_b_t_env = BatchTableEnvironment.create(environment_settings=b_b_settings)

Review comment:
       Isn't this using the legacy planner in the background? I thought the Python API has also a unified environment, no?

##########
File path: docs/dev/table/legacy_planner.md
##########
@@ -0,0 +1,341 @@
+---
+title: "Legacy Planner"
+nav-parent_id: tableapi
+nav-pos: 1001
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Table planners are responsible for translating relational operators into an executable, optimized Flink job.
+Flink supports two different planner implementations; the modern planner (sometimes referred to as `Blink`) and the legacy planner.
+For production use cases, we recommend the modern planner which is the default.
+
+The legacy planner is in maintenance mode and no longer under active development.
+The primary reason to continue using the legacy planner is [DataSet]({% link dev/batch/index.md %}) interop.
+
+{% capture dataset_interop_note %}
+If you are not using the Legacy planner for DataSet interop, the community strongly
+encourages you to consider the modern table planner.
+The legacy planner will be dropped at some point in the future.
+{% endcapture %}
+{% include warning.html content=dataset_interop_note %}
+
+This page describes how to use the Legacy planner and where its semantics differ from the 
+modern planner. 
+
+* This will be replaced by the TOC
+{:toc}
+
+## Setup
+
+### Dependencies
+
+When deploying to a cluster, the legacy planner is bundled in Flinks distribution by default.
+If you want to run the Table API & SQL programs locally within your IDE, you must add the
+following set of modules to your application.
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-table-planner{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version}}</version>
+  <scope>provided</scope>
+</dependency>
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-streaming-scala{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version}}</version>
+  <scope>provided</scope>
+</dependency>
+{% endhighlight %}
+
+### Configuring the TableEnvironment 
+
+When creating a `TableEnvironment` the Legacy planner is configured via the `EnvironmentSettings`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+EnvironmentSettings settings = EnvironmentSettings
+    .newInstance()
+    .useOldPlanner()
+    .inStreamingMode()
+    // or in batch mode
+    //.inBatchMode()
+    .build();
+
+TableEnvironment tEnv = TableEnvironment.create(settings);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val settings = EnvironmentSettings
+    .newInstance()
+    .useOldPlanner()
+    .inStreamingMode()
+    // or in batch mode
+    //.inBatchMode()
+    .build()
+
+val tEnv = TableEnvironment.create(settings)
+{% endhighlight %}
+</div>
+</div>
+
+`BatchTableEnvironment` may used for [DataSet]({% link dev/batch/index.md %}) and [DataStream]({% link dev/datastream_api.md %}) interop respectively.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment()
+val tEnv = BatchTableEnvironment.create(env)
+{% endhighlight %}
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+from pyflink.dataset import ExecutionEnvironment
+from pyflink.table import BatchTableEnvironment
+
+f_b_env = ExecutionEnvironment.get_execution_environment()
+f_b_t_env = BatchTableEnvironment.create(f_b_env, table_config)

Review comment:
       shorten the `f_b_t_env `?

##########
File path: docs/dev/table/common.md
##########
@@ -896,71 +805,57 @@ tableEnv.createTemporaryView("myTable", stream);
 tableEnv.createTemporaryView("myTable2", stream, $("myLong"), $("myString"));
 {% endhighlight %}
 </div>
-
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-// get TableEnvironment 
-// registration of a DataSet is equivalent
-val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
+val tableEnv: StreamTableEnvironment = ???
 
 val stream: DataStream[(Long, String)] = ...
 
-// register the DataStream as View "myTable" with fields "f0", "f1"
+// Register the DataStream as View "myTable" with fields "f0", "f1"
 tableEnv.createTemporaryView("myTable", stream)
 
-// register the DataStream as View "myTable2" with fields "myLong", "myString"
-tableEnv.createTemporaryView("myTable2", stream, 'myLong, 'myString)
+// Register the DataStream as View "myTable2" with fields "myLong", "myString"
+tableEnv.createTemporaryView("myTable2", stream, $"myLong", $"myString");
 {% endhighlight %}
 </div>
 </div>
 
 {% top %}
 
-### Convert a DataStream or DataSet into a Table
+### Convert a DataStream into a Table
 
-Instead of registering a `DataStream` or `DataSet` in a `TableEnvironment`, it can also be directly converted into a `Table`. This is convenient if you want to use the Table in a Table API query. 
+A `DataStream` can be directly converted to a `Table` in a `StreamTableEnvironment`.
+The schema of the resulting view depends on the data type of the registered collection.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-// get StreamTableEnvironment
-// registration of a DataSet in a BatchTableEnvironment is equivalent
-StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
-
+StreamTableEnvironment tableEnv = ...; 
 DataStream<Tuple2<Long, String>> stream = ...
 
-// Convert the DataStream into a Table with default fields "f0", "f1"
-Table table1 = tableEnv.fromDataStream(stream);
-
-// Convert the DataStream into a Table with fields "myLong", "myString"
 Table table2 = tableEnv.fromDataStream(stream, $("myLong"), $("myString"));
 {% endhighlight %}
 </div>
-
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-// get TableEnvironment
-// registration of a DataSet is equivalent
-val tableEnv = ... // see "Create a TableEnvironment" section
-
-val stream: DataStream[(Long, String)] = ...
+val tableEnv: StreamTableEnvironment = ???
+val stream: DataStream[(Long, String)] = ???
 
-// convert the DataStream into a Table with default fields "_1", "_2"
-val table1: Table = tableEnv.fromDataStream(stream)
-
-// convert the DataStream into a Table with fields "myLong", "myString"
 val table2: Table = tableEnv.fromDataStream(stream, $"myLong", $"myString")
 {% endhighlight %}
 </div>
 </div>
 
 {% top %}
 
-### Convert a Table into a DataStream or DataSet
+### Convert a Table into a DataStream 
 
-A `Table` can be converted into a `DataStream` or `DataSet`. In this way, custom DataStream or DataSet programs can be run on the result of a Table API or SQL query.
+A results of a `Table` can be converted into a `DataStream`.
+In this way, custom DataStream programs can be run on the result of a Table API or SQL query.

Review comment:
       nit: "`DataStream`"

##########
File path: docs/dev/table/types.md
##########
@@ -186,126 +152,6 @@ a table program (e.g. `field.cast(TIMESTAMP(3).bridgedTo(Timestamp.class))`) are
 </div>
 </div>

Review comment:
       I would not drop the Blink planner table yet. We still don't support all SQL DataTypes. Or we add the `Remarks for Data Type` column to each subsection.

##########
File path: docs/dev/table/legacy_planner.md
##########
@@ -0,0 +1,341 @@
+---
+title: "Legacy Planner"
+nav-parent_id: tableapi
+nav-pos: 1001
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Table planners are responsible for translating relational operators into an executable, optimized Flink job.
+Flink supports two different planner implementations; the modern planner (sometimes referred to as `Blink`) and the legacy planner.
+For production use cases, we recommend the modern planner which is the default.
+
+The legacy planner is in maintenance mode and no longer under active development.
+The primary reason to continue using the legacy planner is [DataSet]({% link dev/batch/index.md %}) interop.
+
+{% capture dataset_interop_note %}
+If you are not using the Legacy planner for DataSet interop, the community strongly
+encourages you to consider the modern table planner.
+The legacy planner will be dropped at some point in the future.
+{% endcapture %}
+{% include warning.html content=dataset_interop_note %}
+
+This page describes how to use the Legacy planner and where its semantics differ from the 
+modern planner. 
+
+* This will be replaced by the TOC
+{:toc}
+
+## Setup
+
+### Dependencies
+
+When deploying to a cluster, the legacy planner is bundled in Flinks distribution by default.
+If you want to run the Table API & SQL programs locally within your IDE, you must add the
+following set of modules to your application.
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-table-planner{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version}}</version>
+  <scope>provided</scope>
+</dependency>
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-streaming-scala{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version}}</version>
+  <scope>provided</scope>
+</dependency>
+{% endhighlight %}
+
+### Configuring the TableEnvironment 
+
+When creating a `TableEnvironment` the Legacy planner is configured via the `EnvironmentSettings`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+EnvironmentSettings settings = EnvironmentSettings
+    .newInstance()
+    .useOldPlanner()
+    .inStreamingMode()
+    // or in batch mode
+    //.inBatchMode()
+    .build();
+
+TableEnvironment tEnv = TableEnvironment.create(settings);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val settings = EnvironmentSettings
+    .newInstance()
+    .useOldPlanner()
+    .inStreamingMode()
+    // or in batch mode
+    //.inBatchMode()
+    .build()
+
+val tEnv = TableEnvironment.create(settings)
+{% endhighlight %}
+</div>
+</div>
+
+`BatchTableEnvironment` may used for [DataSet]({% link dev/batch/index.md %}) and [DataStream]({% link dev/datastream_api.md %}) interop respectively.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment()
+val tEnv = BatchTableEnvironment.create(env)
+{% endhighlight %}
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+from pyflink.dataset import ExecutionEnvironment
+from pyflink.table import BatchTableEnvironment
+
+f_b_env = ExecutionEnvironment.get_execution_environment()
+f_b_t_env = BatchTableEnvironment.create(f_b_env, table_config)
+{% endhighlight %}
+</div>
+</div>
+
+## Integration with DataSet
+
+The primary use case for the Legacy planner is interoperation with the DataSet API. 
+To translate `DataSet`s to and from tables, applications must use the `BatchTableEnvironment`.
+
+### Create a View from a DataSet
+
+A `DataSet` can be registered in a `BatchTableEnvironment` as a `View`.
+The schema of the resulting view depends on the data type of the registered collection.
+
+**Note:** Views created from a `DataSet` can be registered as temporary views only.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+BatchTableEnvironment tEnv = ...; 
+DataSet<Tuple2<Long, String>> dataset = ...;
+
+tEnv.createTemporaryView("my-table", dataset, $("myLong"), $("myString"))
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight java %}
+val tEnv: BatchTableEnvironment = ??? 
+val dataset: DataSet[(Long, String)] = ???
+
+tEnv.createTemporaryView("my-table", dataset, $"myLong", $"myString")
+{% endhighlight %}
+</div>
+</div>
+
+### Create a Table from a DataSet
+
+A `DataSet` can be directly converted to a `Table` in a `BatchTableEnvironment`.
+The schema of the resulting view depends on the data type of the registered collection.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+BatchTableEnvironment tEnv = ...; 
+DataSet<Tuple2<Long, String>> dataset = ...;
+
+Table myTable = tEnv.fromDataSet("my-table", dataset, $("myLong"), $("myString"))
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight java %}
+val tEnv: BatchTableEnvironment = ??? 
+val dataset: DataSet[(Long, String)] = ???
+
+val table = tEnv.fromDataSet("my-table", dataset, $"myLong", $"myString")
+{% endhighlight %}
+</div>
+</div>
+
+### Convert a Table to a DataSet
+
+A `Table` can be converted to a `DataSet`.
+In this way, custom DataSet programs can be run on the result of a Table API or SQL query.
+
+When converting from a `Table`, users must specify the data type of the results.
+Often the most convenient conversion type is `Row`.
+The following list gives an overview of the features of the different options.
+
+- **Row**: fields are mapped by position, arbitrary number of fields, support for `null` values, no type-safe access.
+- **POJO**: fields are mapped by name (POJO fields must be named as `Table` fields), arbitrary number of fields, support for `null` values, type-safe access.
+- **Case Class**: fields are mapped by position, no support for `null` values, type-safe access.
+- **Tuple**: fields are mapped by position, limitation to 22 (Scala) or 25 (Java) fields, no support for `null` values, type-safe access.
+- **Atomic Type**: `Table` must have a single field, no support for `null` values, type-safe access.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
+
+Table table = tableEnv.fromValues(
+    DataTypes.Row(
+        DataTypes.FIELD("name", DataTypes.STRING()),
+        DataTypes.FIELD("age", DataTypes.INT()),
+    row("john", 35),
+    row("sarah", 32));
+
+// Convert the Table into a DataSet of Row by specifying a class
+DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
+
+// Convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformation
+TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(Types.STRING(), Types.INT());
+DataSet<Tuple2<String, Integer>> dsTuple = tableEnv.toDataSet(table, tupleType);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val tableEnv = BatchTableEnvironment.create(env)
+
+val table = tableEnv.fromValues(
+    DataTypes.Row(
+        DataTypes.FIELD("name", DataTypes.STRING()),
+        DataTypes.FIELD("age", DataTypes.INT()),
+    row("john", 35),
+    row("sarah", 32));
+
+// Convert the Table into a DataSet of Row
+val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)
+
+// Convert the Table into a DataSet of Tuple2[String, Int]
+val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention</span> **Once the Table is converted to a DataSet, we must use the ExecutionEnvironment.execute method to execute the DataSet program.**
+
+## Data Types
+
+<div class="codetabs" data-hide-tabs="1" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
+The legacy planner, introduced before Flink 1.9, primarily supports type information.
+It has only limited support for data types.
+It is possible to declare data types that can be translated into type information such that the legacy planner understands them.
+
+The following table summarizes the difference between data type and type information.
+Most simple types, as well as the row type remain the same.
+Time types, array types, and the decimal type need special attention.
+Other hints as the ones mentioned are not allowed.
+
+For the *Type Information* column the table omits the prefix `org.apache.flink.table.api.Types`.
+
+For the *Data Type Representation* column the table omits the prefix `org.apache.flink.table.api.DataTypes`.
+
+| Type Information | Java Expression String | Data Type Representation | Remarks for Data Type |
+|:-----------------|:-----------------------|:-------------------------|:----------------------|
+| `STRING()` | `STRING` | `STRING()` | |
+| `BOOLEAN()` | `BOOLEAN` | `BOOLEAN()` | |
+| `BYTE()` | `BYTE` | `TINYINT()` | |
+| `SHORT()` | `SHORT` | `SMALLINT()` | |
+| `INT()` | `INT` | `INT()` | |
+| `LONG()` | `LONG` | `BIGINT()` | |
+| `FLOAT()` | `FLOAT` | `FLOAT()` | |
+| `DOUBLE()` | `DOUBLE` | `DOUBLE()` | |
+| `ROW(...)` | `ROW<...>` | `ROW(...)` | |
+| `BIG_DEC()` | `DECIMAL` | [`DECIMAL()`] | Not a 1:1 mapping as precision and scale are ignored and Java's variable precision and scale are used. |
+| `SQL_DATE()` | `SQL_DATE` | `DATE()`<br>`.bridgedTo(java.sql.Date.class)` | |
+| `SQL_TIME()` | `SQL_TIME` | `TIME(0)`<br>`.bridgedTo(java.sql.Time.class)` | |
+| `SQL_TIMESTAMP()` | `SQL_TIMESTAMP` | `TIMESTAMP(3)`<br>`.bridgedTo(java.sql.Timestamp.class)` | |
+| `INTERVAL_MONTHS()` | `INTERVAL_MONTHS` | `INTERVAL(MONTH())`<br>`.bridgedTo(Integer.class)` | |
+| `INTERVAL_MILLIS()` | `INTERVAL_MILLIS` | `INTERVAL(DataTypes.SECOND(3))`<br>`.bridgedTo(Long.class)` | |
+| `PRIMITIVE_ARRAY(...)` | `PRIMITIVE_ARRAY<...>` | `ARRAY(DATATYPE.notNull()`<br>`.bridgedTo(PRIMITIVE.class))` | Applies to all JVM primitive types except for `byte`. |
+| `PRIMITIVE_ARRAY(BYTE())` | `PRIMITIVE_ARRAY<BYTE>` | `BYTES()` | |
+| `OBJECT_ARRAY(...)` | `OBJECT_ARRAY<...>` | `ARRAY(`<br>`DATATYPE.bridgedTo(OBJECT.class))` | |
+| `MULTISET(...)` | | `MULTISET(...)` | |
+| `MAP(..., ...)` | `MAP<...,...>` | `MAP(...)` | |
+| other generic types | | `RAW(...)` | |
+
+</div>

Review comment:
       readd the note that people can fallback to type information at any time if the DataType mapping is too complicated




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org