You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/31 11:54:41 UTC

[flink] branch master updated (773157e -> fa4ee95)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 773157e  [FLINK-10725][build] Initialize JDK 11 profile
     new f1a2312  [table-api-java] Postpone check for Blink planner in StreamTableEnvironment
     new 6fc02d1  [FLINK-13273][table-planner-blink] Allow retrieving StreamGraph from Blink executor
     new ba37bd4  [hotfix][table-planner-blink] Update Blink's auto-complete hints
     new 4bb72d0  [hotfix][table-planner-blink] Update Blink's ExpressionReducer classloading to Flink's behavior
     new fea9064  [FLINK-13273][sql-client] Allow switching planners in SQL Client
     new fa4ee95  [hotfix][table] Add missing annotations to factories

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/dev/table/sqlClient.md                        |  53 ++++++-----
 flink-end-to-end-tests/run-nightly-tests.sh        |   3 +-
 .../test-scripts/test_sql_client.sh                |  17 ++--
 .../flink-sql-client/conf/sql-client-defaults.yaml |  40 ++++----
 flink-table/flink-sql-client/pom.xml               |  12 +++
 .../client/config/entries/ExecutionEntry.java      |  90 +++++++++++++++---
 .../flink/table/client/gateway/SessionContext.java |   8 +-
 .../gateway/local/CollectBatchTableSink.java       |  19 ++--
 .../gateway/local/CollectStreamTableSink.java      |   8 +-
 .../client/gateway/local/ExecutionContext.java     | 104 ++++++++++++++++++---
 .../table/client/gateway/local/LocalExecutor.java  |   3 +-
 .../table/client/gateway/local/ResultStore.java    |   2 +-
 .../client/gateway/local/EnvironmentTest.java      |   3 +-
 .../client/gateway/local/ExecutionContextTest.java |   2 +
 .../client/gateway/local/LocalExecutorITCase.java  |  26 +++++-
 .../test/resources/test-sql-client-defaults.yaml   |  15 +--
 .../java/internal/StreamTableEnvironmentImpl.java  |  12 +--
 .../table/factories/BatchTableSinkFactory.java     |   2 +
 .../table/factories/BatchTableSourceFactory.java   |   2 +
 .../table/factories/StreamTableSinkFactory.java    |   2 +
 .../table/factories/StreamTableSourceFactory.java  |   2 +
 .../table/planner/delegation/BatchExecutor.java    |   6 +-
 .../table/planner/delegation/ExecutorBase.java     |  14 ++-
 .../table/planner/delegation/StreamExecutor.java   |   5 +-
 .../table/planner/calcite/FlinkPlannerImpl.scala   |   2 +-
 .../table/planner/codegen/ExpressionReducer.scala  |   2 +-
 tools/travis/splits/split_misc.sh                  |   3 +-
 27 files changed, 336 insertions(+), 121 deletions(-)


[flink] 02/06: [FLINK-13273][table-planner-blink] Allow retrieving StreamGraph from Blink executor

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6fc02d1cb956ca4fe1709f032906d3dc80fb5a6c
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jul 29 11:42:20 2019 +0200

    [FLINK-13273][table-planner-blink] Allow retrieving StreamGraph from Blink executor
---
 .../flink/table/planner/delegation/BatchExecutor.java      |  6 ++----
 .../flink/table/planner/delegation/ExecutorBase.java       | 14 +++++++++++---
 .../flink/table/planner/delegation/StreamExecutor.java     |  5 ++---
 3 files changed, 15 insertions(+), 10 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java
index de7fd3c..7bf4367 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java
@@ -51,7 +51,7 @@ public class BatchExecutor extends ExecutorBase {
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
 		StreamExecutionEnvironment execEnv = getExecutionEnvironment();
-		StreamGraph streamGraph = generateStreamGraph(transformations, jobName);
+		StreamGraph streamGraph = generateStreamGraph(jobName);
 		return execEnv.execute(streamGraph);
 	}
 
@@ -69,9 +69,7 @@ public class BatchExecutor extends ExecutorBase {
 		}
 	}
 
-	/**
-	 * Translates transformationList to streamGraph.
-	 */
+	@Override
 	public StreamGraph generateStreamGraph(List<Transformation<?>> transformations, String jobName) {
 		StreamExecutionEnvironment execEnv = getExecutionEnvironment();
 		setBatchProperties(execEnv);
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ExecutorBase.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ExecutorBase.java
index 701a6cc..10eeafd 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ExecutorBase.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ExecutorBase.java
@@ -58,9 +58,17 @@ public abstract class ExecutorBase implements Executor {
 		return executionEnvironment;
 	}
 
-	public abstract StreamGraph generateStreamGraph(
-			List<Transformation<?>> transformations,
-			String jobName) throws Exception;
+	/**
+	 * Translates the applied transformations to a stream graph.
+	 */
+	public StreamGraph generateStreamGraph(String jobName) {
+		return generateStreamGraph(transformations, jobName);
+	}
+
+	/**
+	 * Translates the given transformations to a stream graph.
+	 */
+	public abstract StreamGraph generateStreamGraph(List<Transformation<?>> transformations, String jobName);
 
 	protected String getNonEmptyJobName(String jobName) {
 		if (StringUtils.isNullOrWhitespaceOnly(jobName)) {
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/StreamExecutor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/StreamExecutor.java
index 4af2f8e..8d1e904 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/StreamExecutor.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/StreamExecutor.java
@@ -46,9 +46,8 @@ public class StreamExecutor extends ExecutorBase {
 		return execEnv.execute(generateStreamGraph(transformations, jobName));
 	}
 
-	public StreamGraph generateStreamGraph(
-			List<Transformation<?>> transformations,
-			String jobName) throws Exception {
+	@Override
+	public StreamGraph generateStreamGraph(List<Transformation<?>> transformations, String jobName) {
 		transformations.forEach(getExecutionEnvironment()::addOperator);
 		return getExecutionEnvironment().getStreamGraph(getNonEmptyJobName(jobName));
 	}


[flink] 03/06: [hotfix][table-planner-blink] Update Blink's auto-complete hints

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ba37bd4c6a4c3ae78111d44645f6b2a67bf93e56
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jul 29 11:43:51 2019 +0200

    [hotfix][table-planner-blink] Update Blink's auto-complete hints
---
 .../scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index 19041ec..0f8bd03 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -78,7 +78,7 @@ class FlinkPlannerImpl(
       catalogReaderSupplier.apply(true), // ignore cases for lenient completion
       typeFactory,
       config.getParserConfig.conformance())
-    val advisor = new SqlAdvisor(advisorValidator)
+    val advisor = new SqlAdvisor(advisorValidator, config.getParserConfig)
     val replaced = Array[String](null)
     val hints = advisor.getCompletionHints(sql, cursor, replaced)
       .map(item => item.toIdentifier.toString)


[flink] 06/06: [hotfix][table] Add missing annotations to factories

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fa4ee95f07bcb58f1f6d1ca6fa04a37f771e05c0
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue Jul 30 18:27:02 2019 +0200

    [hotfix][table] Add missing annotations to factories
---
 .../java/org/apache/flink/table/factories/BatchTableSinkFactory.java    | 2 ++
 .../java/org/apache/flink/table/factories/BatchTableSourceFactory.java  | 2 ++
 .../java/org/apache/flink/table/factories/StreamTableSinkFactory.java   | 2 ++
 .../java/org/apache/flink/table/factories/StreamTableSourceFactory.java | 2 ++
 4 files changed, 8 insertions(+)

diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/BatchTableSinkFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/BatchTableSinkFactory.java
index 910dc82..13dcc71 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/BatchTableSinkFactory.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/BatchTableSinkFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.factories;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.sinks.BatchTableSink;
 import org.apache.flink.table.sinks.TableSink;
 
@@ -29,6 +30,7 @@ import java.util.Map;
  *
  * @param <T> type of records that the factory consumes
  */
+@PublicEvolving
 public interface BatchTableSinkFactory<T> extends TableSinkFactory<T> {
 
 	/**
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/BatchTableSourceFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/BatchTableSourceFactory.java
index 05db20b..267385d 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/BatchTableSourceFactory.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/BatchTableSourceFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.factories;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.sources.BatchTableSource;
 import org.apache.flink.table.sources.TableSource;
 
@@ -29,6 +30,7 @@ import java.util.Map;
  *
  * @param <T> type of records that the factory produces
  */
+@PublicEvolving
 public interface BatchTableSourceFactory<T> extends TableSourceFactory<T> {
 
 	/**
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSinkFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSinkFactory.java
index be25844..86c7e82 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSinkFactory.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSinkFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.factories;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.sinks.StreamTableSink;
 import org.apache.flink.table.sinks.TableSink;
 
@@ -29,6 +30,7 @@ import java.util.Map;
  *
  * @param <T> type of records that the factory consumes
  */
+@PublicEvolving
 public interface StreamTableSinkFactory<T> extends TableSinkFactory<T> {
 
 	/**
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSourceFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSourceFactory.java
index 26b5e9c..b007d7b 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSourceFactory.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSourceFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.factories;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.table.sources.TableSource;
 
@@ -29,6 +30,7 @@ import java.util.Map;
  *
  * @param <T> type of records that the factory produces
  */
+@PublicEvolving
 public interface StreamTableSourceFactory<T> extends TableSourceFactory<T> {
 
 	/**


[flink] 05/06: [FLINK-13273][sql-client] Allow switching planners in SQL Client

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fea90644005d32af5a5b8ff79c2baafad687d3f7
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jul 29 14:30:10 2019 +0200

    [FLINK-13273][sql-client] Allow switching planners in SQL Client
    
    This closes #9266.
---
 docs/dev/table/sqlClient.md                        |  53 ++++++-----
 flink-end-to-end-tests/run-nightly-tests.sh        |   3 +-
 .../test-scripts/test_sql_client.sh                |  17 ++--
 .../flink-sql-client/conf/sql-client-defaults.yaml |  40 ++++----
 flink-table/flink-sql-client/pom.xml               |  12 +++
 .../client/config/entries/ExecutionEntry.java      |  90 +++++++++++++++---
 .../flink/table/client/gateway/SessionContext.java |   8 +-
 .../gateway/local/CollectBatchTableSink.java       |  19 ++--
 .../gateway/local/CollectStreamTableSink.java      |   8 +-
 .../client/gateway/local/ExecutionContext.java     | 104 ++++++++++++++++++---
 .../table/client/gateway/local/LocalExecutor.java  |   3 +-
 .../table/client/gateway/local/ResultStore.java    |   2 +-
 .../client/gateway/local/EnvironmentTest.java      |   3 +-
 .../client/gateway/local/ExecutionContextTest.java |   2 +
 .../client/gateway/local/LocalExecutorITCase.java  |  26 +++++-
 .../test/resources/test-sql-client-defaults.yaml   |  15 +--
 tools/travis/splits/split_misc.sh                  |   3 +-
 17 files changed, 306 insertions(+), 102 deletions(-)

diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index cd69cce..8ec569d 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -199,9 +199,24 @@ functions:
       - 7.6
       - false
 
+# Define available catalogs
+
+catalogs:
+   - name: catalog_1
+     type: hive
+     property-version: 1
+     hive-conf-dir: ...
+   - name: catalog_2
+     type: hive
+     property-version: 1
+     default-database: mydb2
+     hive-conf-dir: ...
+     hive-version: 1.2.1
+
 # Execution properties allow for changing the behavior of a table program.
 
 execution:
+  planner: old                      # optional: either 'old' (default) or 'blink'
   type: streaming                   # required: execution mode either 'batch' or 'streaming'
   result-mode: table                # required: either 'table' or 'changelog'
   max-table-result-rows: 1000000    # optional: maximum number of maintained rows in
@@ -212,29 +227,16 @@ execution:
   max-parallelism: 16               # optional: Flink's maximum parallelism (128 by default)
   min-idle-state-retention: 0       # optional: table program's minimum idle state time
   max-idle-state-retention: 0       # optional: table program's maximum idle state time
+  current-catalog: catalog_1        # optional: name of the current catalog of the session ('default_catalog' by default)
+  current-database: mydb1           # optional: name of the current database of the current catalog
+                                    #   (default database of the current catalog by default)
   restart-strategy:                 # optional: restart strategy
     type: fallback                  #   "fallback" to global restart strategy by default
-  current-catalog: catalog_1        # optional: name of the current catalog of the session ("default_catalog" by default)
-  current-database: mydb1           # optional: name of the current database of the current catalog (default value is the default database name of the current catalog)
 
 # Deployment properties allow for describing the cluster to which table programs are submitted to.
 
 deployment:
   response-timeout: 5000
-
-# Catalogs
-
-catalogs:
-   - name: catalog_1
-     type: hive
-     property-version: 1
-     hive-conf-dir: ...
-   - name: catalog_2
-     type: hive
-     property-version: 1
-     default-database: mydb2        # optional: name of default database of this catalog
-     hive-conf-dir: ...             # optional: path of Hive conf directory. (Default value is created by HiveConf)
-     hive-version: 1.2.1            # optional: version of Hive (2.3.4 by default)
 {% endhighlight %}
 
 This configuration:
@@ -242,11 +244,10 @@ This configuration:
 - defines an environment with a table source `MyTableSource` that reads from a CSV file,
 - defines a view `MyCustomView` that declares a virtual table using a SQL query,
 - defines a user-defined function `myUDF` that can be instantiated using the class name and two constructor parameters,
+- connects to two Hive catalogs and uses `catalog_1` as the current catalog with `mydb1` as the current database of the catalog,
 - specifies a parallelism of 1 for queries executed in this streaming environment,
 - specifies an event-time characteristic, and
 - runs queries in the `table` result mode.
-- creates two `HiveCatalog` (type: hive) named with their own default databases and specified Hive conf directory. Hive version of the first `HiveCatalog` is `2.3.4` by default and that of the second one is specified as `1.2.1`.
-- use `catalog_1` as the current catalog of the environment upon start, and `mydb1` as the current database of the catalog.
 
 Depending on the use case, a configuration can be split into multiple files. Therefore, environment files can be created for general purposes (*defaults environment file* using `--defaults`) as well as on a per-session basis (*session environment file* using `--environment`). Every CLI session is initialized with the default properties followed by the session properties. For example, the defaults environment file could specify all table sources that should be available for querying in ev [...]
 
@@ -431,16 +432,11 @@ This process can be recursively performed until all the constructor parameters a
 Catalogs
 --------
 
-Catalogs can be defined as a set of yaml properties and are automatically registered to the environment upon starting SQL Client.
+Catalogs can be defined as a set of YAML properties and are automatically registered to the environment upon starting SQL Client.
 
-Users can specify in section `execution` that which catalog they want to use as the current catalog in SQL CLI, and which database of the catalog they want to use as the current database. 
+Users can specify which catalog they want to use as the current catalog in SQL CLI, and which database of the catalog they want to use as the current database.
 
 {% highlight yaml %}
-execution:
-   ...
-   current-catalog: catalog_1
-   current-database: mydb1
-
 catalogs:
    - name: catalog_1
      type: hive
@@ -452,9 +448,12 @@ catalogs:
      type: hive
      property-version: 1
      hive-conf-dir: <path of Hive conf directory>
-{% endhighlight %}
 
-Currently Flink supports two types of catalog - `FlinkInMemoryCatalog` and `HiveCatalog`.
+execution:
+   ...
+   current-catalog: catalog_1
+   current-database: mydb1
+{% endhighlight %}
 
 For more information about catalog, see [Catalogs]({{ site.baseurl }}/dev/table/catalog.html).
 
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 653803a..b8a1a40 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -146,7 +146,8 @@ run_test "Avro Confluent Schema Registry nightly end-to-end test" "$END_TO_END_D
 run_test "State TTL Heap backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh file"
 run_test "State TTL RocksDb backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh rocks"
 
-run_test "SQL Client end-to-end test" "$END_TO_END_DIR/test-scripts/test_sql_client.sh"
+run_test "SQL Client end-to-end test (Old planner)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old"
+run_test "SQL Client end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink"
 run_test "SQL Client end-to-end test for Kafka 0.10" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka010.sh"
 run_test "SQL Client end-to-end test for Kafka 0.11" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka011.sh"
 run_test "SQL Client end-to-end test for modern Kafka" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka.sh"
diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
index 3503054..5798545 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
@@ -19,6 +19,8 @@
 
 set -Eeuo pipefail
 
+PLANNER="${1:-old}"
+
 KAFKA_VERSION="2.2.0"
 CONFLUENT_VERSION="5.0.0"
 CONFLUENT_MAJOR_VERSION="5.0"
@@ -185,6 +187,9 @@ functions:
   - name: RegReplace
     from: class
     class: org.apache.flink.table.toolbox.StringRegexReplaceFunction
+
+execution:
+  planner: "$PLANNER"
 EOF
 
 # submit SQL statements
@@ -194,7 +199,7 @@ echo "Executing SQL: Values -> Elasticsearch (upsert)"
 SQL_STATEMENT_3=$(cat << EOF
 INSERT INTO ElasticsearchUpsertSinkTable
   SELECT user_id, user_name, COUNT(*) AS user_count
-  FROM (VALUES (1, 'Bob'), (22, 'Alice'), (42, 'Greg'), (42, 'Greg'), (42, 'Greg'), (1, 'Bob'))
+  FROM (VALUES (1, 'Bob'), (22, 'Tom'), (42, 'Kim'), (42, 'Kim'), (42, 'Kim'), (1, 'Bob'))
     AS UserCountTable(user_id, user_name)
   GROUP BY user_id, user_name
 EOF
@@ -208,7 +213,7 @@ JOB_ID=$($FLINK_DIR/bin/sql-client.sh embedded \
 
 wait_job_terminal_state "$JOB_ID" "FINISHED"
 
-verify_result_hash "SQL Client Elasticsearch Upsert" "$ELASTICSEARCH_INDEX" 3 "982cb32908def9801e781381c1b8a8db"
+verify_result_hash "SQL Client Elasticsearch Upsert" "$ELASTICSEARCH_INDEX" 3 "21a76360e2a40f442816d940e7071ccf"
 
 echo "Executing SQL: Values -> Elasticsearch (append, no key)"
 
@@ -218,10 +223,10 @@ INSERT INTO ElasticsearchAppendSinkTable
   FROM (
     VALUES
       (1, 'Bob', CAST(0 AS BIGINT)),
-      (22, 'Alice', CAST(0 AS BIGINT)),
-      (42, 'Greg', CAST(0 AS BIGINT)),
-      (42, 'Greg', CAST(0 AS BIGINT)),
-      (42, 'Greg', CAST(0 AS BIGINT)),
+      (22, 'Tom', CAST(0 AS BIGINT)),
+      (42, 'Kim', CAST(0 AS BIGINT)),
+      (42, 'Kim', CAST(0 AS BIGINT)),
+      (42, 'Kim', CAST(0 AS BIGINT)),
       (1, 'Bob', CAST(0 AS BIGINT)))
     AS UserCountTable(user_id, user_name, user_count)
 EOF
diff --git a/flink-table/flink-sql-client/conf/sql-client-defaults.yaml b/flink-table/flink-sql-client/conf/sql-client-defaults.yaml
index 2ce3af6..db6a0ea 100644
--- a/flink-table/flink-sql-client/conf/sql-client-defaults.yaml
+++ b/flink-table/flink-sql-client/conf/sql-client-defaults.yaml
@@ -50,6 +50,7 @@ tables: [] # empty list
 #   time-attribute: ...
 #   primary-key: ...
 
+
 #==============================================================================
 # User-defined functions
 #==============================================================================
@@ -63,6 +64,21 @@ functions: [] # empty list
 #   class: ...
 #   constructor: ...
 
+
+#==============================================================================
+# Catalogs
+#==============================================================================
+
+# Define catalogs here.
+
+catalogs: [] # empty list
+# A typical catalog definition looks like:
+#  - name: myhive
+#    type: hive
+#    hive-conf-dir: /opt/hive_conf/
+#    default-database: ...
+
+
 #==============================================================================
 # Execution properties
 #==============================================================================
@@ -70,6 +86,9 @@ functions: [] # empty list
 # Execution properties allow for changing the behavior of a table program.
 
 execution:
+  # select the implementation responsible for planning table programs
+  # possible values are 'old' (used by default) or 'blink'
+  planner: old
   # 'batch' or 'streaming' execution
   type: streaming
   # allow 'event-time' or only 'processing-time' in sources
@@ -88,15 +107,15 @@ execution:
   min-idle-state-retention: 0
   # maximum idle state retention in ms
   max-idle-state-retention: 0
+  # current catalog ('default_catalog' by default)
+  current-catalog: default_catalog
+  # current database of the current catalog (default database of the catalog by default)
+  current-database: default_database
   # controls how table programs are restarted in case of a failures
   restart-strategy:
     # strategy type
     # possible values are "fixed-delay", "failure-rate", "none", or "fallback" (default)
     type: fallback
-  # current catalog of SQL Client
-#  current-catalog: ...
-  # current database of the current catalog
-#  current-database: ...
 
 
 #==============================================================================
@@ -113,16 +132,3 @@ deployment:
   gateway-address: ""
   # (optional) port from cluster to gateway
   gateway-port: 0
-
-
-#==============================================================================
-# Catalogs
-#==============================================================================
-
-# Define catalogs here.
-
-catalogs: [] # empty list
-#  - name: myhive
-#    type: hive
-#    hive-conf-dir: /opt/hive_conf/
-#    default-database: ...
diff --git a/flink-table/flink-sql-client/pom.xml b/flink-table/flink-sql-client/pom.xml
index f725ff0..d5e95fc 100644
--- a/flink-table/flink-sql-client/pom.xml
+++ b/flink-table/flink-sql-client/pom.xml
@@ -84,6 +84,18 @@ under the License.
 			<version>${project.version}</version>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
 		<!-- logging utilities -->
 		<dependency>
 			<groupId>org.slf4j</groupId>
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java
index 80d3efb..a65642b 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.client.config.entries;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.client.config.ConfigUtil;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.descriptors.DescriptorProperties;
@@ -49,6 +50,12 @@ public class ExecutionEntry extends ConfigEntry {
 	public static final ExecutionEntry DEFAULT_INSTANCE =
 		new ExecutionEntry(new DescriptorProperties(true));
 
+	private static final String EXECUTION_PLANNER = "planner";
+
+	public static final String EXECUTION_PLANNER_VALUE_OLD = "old";
+
+	public static final String EXECUTION_PLANNER_VALUE_BLINK = "blink";
+
 	private static final String EXECUTION_TYPE = "type";
 
 	private static final String EXECUTION_TYPE_VALUE_STREAMING = "streaming";
@@ -97,9 +104,9 @@ public class ExecutionEntry extends ConfigEntry {
 
 	private static final String EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL = "restart-strategy.max-failures-per-interval";
 
-	public static final String EXECUTION_CURRNET_CATALOG = "current-catalog";
+	public static final String EXECUTION_CURRENT_CATALOG = "current-catalog";
 
-	public static final String EXECUTION_CURRNET_DATABASE = "current-database";
+	public static final String EXECUTION_CURRENT_DATABASE = "current-database";
 
 	private ExecutionEntry(DescriptorProperties properties) {
 		super(properties);
@@ -108,6 +115,12 @@ public class ExecutionEntry extends ConfigEntry {
 	@Override
 	protected void validate(DescriptorProperties properties) {
 		properties.validateEnumValues(
+			EXECUTION_PLANNER,
+			true,
+			Arrays.asList(
+				EXECUTION_PLANNER_VALUE_OLD,
+				EXECUTION_PLANNER_VALUE_BLINK));
+		properties.validateEnumValues(
 			EXECUTION_TYPE,
 			true,
 			Arrays.asList(
@@ -137,20 +150,73 @@ public class ExecutionEntry extends ConfigEntry {
 		properties.validateLong(EXECUTION_RESTART_STRATEGY_DELAY, true, 0);
 		properties.validateLong(EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL, true, 1);
 		properties.validateInt(EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL, true, 1);
-		properties.validateString(EXECUTION_CURRNET_CATALOG, true, 1);
-		properties.validateString(EXECUTION_CURRNET_DATABASE, true, 1);
+		properties.validateString(EXECUTION_CURRENT_CATALOG, true, 1);
+		properties.validateString(EXECUTION_CURRENT_DATABASE, true, 1);
+	}
+
+	public EnvironmentSettings getEnvironmentSettings() {
+		final EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance();
+
+		if (inStreamingMode()) {
+			builder.inStreamingMode();
+		} else if (inBatchMode()) {
+			builder.inBatchMode();
+		}
+
+		final String planner = properties.getOptionalString(EXECUTION_PLANNER)
+			.orElse(EXECUTION_PLANNER_VALUE_OLD);
+
+		if (planner.equals(EXECUTION_PLANNER_VALUE_OLD)) {
+			builder.useOldPlanner();
+		} else if (planner.equals(EXECUTION_PLANNER_VALUE_BLINK)) {
+			builder.useBlinkPlanner();
+		}
+
+		return builder.build();
 	}
 
-	public boolean isStreamingExecution() {
+	public boolean inStreamingMode() {
 		return properties.getOptionalString(EXECUTION_TYPE)
-			.map((v) -> v.equals(EXECUTION_TYPE_VALUE_STREAMING))
-			.orElse(false);
+				.map((v) -> v.equals(EXECUTION_TYPE_VALUE_STREAMING))
+				.orElse(false);
 	}
 
-	public boolean isBatchExecution() {
+	public boolean inBatchMode() {
 		return properties.getOptionalString(EXECUTION_TYPE)
-			.map((v) -> v.equals(EXECUTION_TYPE_VALUE_BATCH))
-			.orElse(false);
+				.map((v) -> v.equals(EXECUTION_TYPE_VALUE_BATCH))
+				.orElse(false);
+	}
+
+	public boolean isStreamingPlanner() {
+		final String planner = properties.getOptionalString(EXECUTION_PLANNER)
+			.orElse(EXECUTION_PLANNER_VALUE_OLD);
+
+		// Blink planner is a streaming planner
+		if (planner.equals(EXECUTION_PLANNER_VALUE_BLINK)) {
+			return true;
+		}
+		// Old planner can be a streaming or batch planner
+		else if (planner.equals(EXECUTION_PLANNER_VALUE_OLD)) {
+			return inStreamingMode();
+		}
+
+		return false;
+	}
+
+	public boolean isBatchPlanner() {
+		final String planner = properties.getOptionalString(EXECUTION_PLANNER)
+			.orElse(EXECUTION_PLANNER_VALUE_OLD);
+
+		// Blink planner is not a batch planner
+		if (planner.equals(EXECUTION_PLANNER_VALUE_BLINK)) {
+			return false;
+		}
+		// Old planner can be a streaming or batch planner
+		else if (planner.equals(EXECUTION_PLANNER_VALUE_OLD)) {
+			return inBatchMode();
+		}
+
+		return false;
 	}
 
 	public TimeCharacteristic getTimeCharacteristic() {
@@ -237,11 +303,11 @@ public class ExecutionEntry extends ConfigEntry {
 	}
 
 	public Optional<String> getCurrentCatalog() {
-		return properties.getOptionalString(EXECUTION_CURRNET_CATALOG);
+		return properties.getOptionalString(EXECUTION_CURRENT_CATALOG);
 	}
 
 	public Optional<String> getCurrentDatabase() {
-		return properties.getOptionalString(EXECUTION_CURRNET_DATABASE);
+		return properties.getOptionalString(EXECUTION_CURRENT_DATABASE);
 	}
 
 	public boolean isChangelogMode() {
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
index d2a7da2..11f4224 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
@@ -79,23 +79,23 @@ public class SessionContext {
 	}
 
 	public Optional<String> getCurrentCatalog() {
-		return Optional.ofNullable(sessionProperties.get(ExecutionEntry.EXECUTION_CURRNET_CATALOG));
+		return Optional.ofNullable(sessionProperties.get(ExecutionEntry.EXECUTION_CURRENT_CATALOG));
 	}
 
 	public void setCurrentCatalog(String currentCatalog) {
 		checkArgument(!StringUtils.isNullOrWhitespaceOnly(currentCatalog));
 
-		sessionProperties.put(ExecutionEntry.EXECUTION_CURRNET_CATALOG, currentCatalog);
+		sessionProperties.put(ExecutionEntry.EXECUTION_CURRENT_CATALOG, currentCatalog);
 	}
 
 	public Optional<String> getCurrentDatabase() {
-		return Optional.ofNullable(sessionProperties.get(ExecutionEntry.EXECUTION_CURRNET_DATABASE));
+		return Optional.ofNullable(sessionProperties.get(ExecutionEntry.EXECUTION_CURRENT_DATABASE));
 	}
 
 	public void setCurrentDatabase(String currentDatabase) {
 		checkArgument(!StringUtils.isNullOrWhitespaceOnly(currentDatabase));
 
-		sessionProperties.put(ExecutionEntry.EXECUTION_CURRNET_DATABASE, currentDatabase);
+		sessionProperties.put(ExecutionEntry.EXECUTION_CURRENT_DATABASE, currentDatabase);
 	}
 
 	public Environment getEnvironment() {
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java
index 55f0038..9dccfb7 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java
@@ -18,18 +18,20 @@
 
 package org.apache.flink.table.client.gateway.local;
 
+import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.table.sinks.BatchTableSink;
+import org.apache.flink.table.sinks.OutputFormatTableSink;
 import org.apache.flink.types.Row;
 
 /**
  * Table sink for collecting the results locally all at once using accumulators.
  */
-public class CollectBatchTableSink implements BatchTableSink<Row> {
+public class CollectBatchTableSink extends OutputFormatTableSink<Row> implements BatchTableSink<Row> {
 
 	private final String accumulatorName;
 	private final TypeSerializer<Row> serializer;
@@ -42,6 +44,13 @@ public class CollectBatchTableSink implements BatchTableSink<Row> {
 		this.serializer = serializer;
 	}
 
+	/**
+	 * Returns the serializer for deserializing the collected result.
+	 */
+	public TypeSerializer<Row> getSerializer() {
+		return serializer;
+	}
+
 	@Override
 	public TypeInformation<Row> getOutputType() {
 		return Types.ROW_NAMED(fieldNames, fieldTypes);
@@ -72,10 +81,8 @@ public class CollectBatchTableSink implements BatchTableSink<Row> {
 			.name("SQL Client Batch Collect Sink");
 	}
 
-	/**
-	 * Returns the serializer for deserializing the collected result.
-	 */
-	public TypeSerializer<Row> getSerializer() {
-		return serializer;
+	@Override
+	public OutputFormat<Row> getOutputFormat() {
+		return new Utils.CollectHelper<>(accumulatorName, serializer);
 	}
 }
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java
index e36eb35..ce8565a 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.experimental.CollectSink;
 import org.apache.flink.table.sinks.RetractStreamTableSink;
 import org.apache.flink.types.Row;
@@ -73,8 +74,13 @@ public class CollectStreamTableSink implements RetractStreamTableSink<Row> {
 
 	@Override
 	public void emitDataStream(DataStream<Tuple2<Boolean, Row>> stream) {
+		consumeDataStream(stream);
+	}
+
+	@Override
+	public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> stream) {
 		// add sink
-		stream
+		return stream
 			.addSink(new CollectSink<>(targetAddress, targetPort, serializer))
 			.name("SQL Client Stream Collect Sink")
 			.setParallelism(1);
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index ae7fb2c..f212e98 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -39,13 +39,20 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.BatchQueryConfig;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.QueryConfig;
 import org.apache.flink.table.api.StreamQueryConfig;
 import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
 import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.config.entries.DeploymentEntry;
 import org.apache.flink.table.client.config.entries.ExecutionEntry;
@@ -56,17 +63,23 @@ import org.apache.flink.table.client.config.entries.TemporalTableEntry;
 import org.apache.flink.table.client.config.entries.ViewEntry;
 import org.apache.flink.table.client.gateway.SessionContext;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.PlannerFactory;
 import org.apache.flink.table.factories.BatchTableSinkFactory;
 import org.apache.flink.table.factories.BatchTableSourceFactory;
 import org.apache.flink.table.factories.CatalogFactory;
-import org.apache.flink.table.factories.StreamTableSinkFactory;
-import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.factories.ComponentFactoryService;
 import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.factories.TableSinkFactory;
+import org.apache.flink.table.factories.TableSourceFactory;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.FunctionService;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.planner.delegation.ExecutorBase;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.util.FlinkException;
@@ -74,6 +87,7 @@ import org.apache.flink.util.FlinkException;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
 
+import java.lang.reflect.Method;
 import java.net.URL;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -246,11 +260,11 @@ public class ExecutionContext<T> {
 	}
 
 	private static TableSource<?> createTableSource(ExecutionEntry execution, Map<String, String> sourceProperties, ClassLoader classLoader) {
-		if (execution.isStreamingExecution()) {
-			final StreamTableSourceFactory<?> factory = (StreamTableSourceFactory<?>)
-				TableFactoryService.find(StreamTableSourceFactory.class, sourceProperties, classLoader);
-			return factory.createStreamTableSource(sourceProperties);
-		} else if (execution.isBatchExecution()) {
+		if (execution.isStreamingPlanner()) {
+			final TableSourceFactory<?> factory = (TableSourceFactory<?>)
+				TableFactoryService.find(TableSourceFactory.class, sourceProperties, classLoader);
+			return factory.createTableSource(sourceProperties);
+		} else if (execution.isBatchPlanner()) {
 			final BatchTableSourceFactory<?> factory = (BatchTableSourceFactory<?>)
 				TableFactoryService.find(BatchTableSourceFactory.class, sourceProperties, classLoader);
 			return factory.createBatchTableSource(sourceProperties);
@@ -259,11 +273,11 @@ public class ExecutionContext<T> {
 	}
 
 	private static TableSink<?> createTableSink(ExecutionEntry execution, Map<String, String> sinkProperties, ClassLoader classLoader) {
-		if (execution.isStreamingExecution()) {
-			final StreamTableSinkFactory<?> factory = (StreamTableSinkFactory<?>)
-				TableFactoryService.find(StreamTableSinkFactory.class, sinkProperties, classLoader);
-			return factory.createStreamTableSink(sinkProperties);
-		} else if (execution.isBatchExecution()) {
+		if (execution.isStreamingPlanner()) {
+			final TableSinkFactory<?> factory = (TableSinkFactory<?>)
+				TableFactoryService.find(TableSinkFactory.class, sinkProperties, classLoader);
+			return factory.createTableSink(sinkProperties);
+		} else if (execution.isBatchPlanner()) {
 			final BatchTableSinkFactory<?> factory = (BatchTableSinkFactory<?>)
 				TableFactoryService.find(BatchTableSinkFactory.class, sinkProperties, classLoader);
 			return factory.createBatchTableSink(sinkProperties);
@@ -271,6 +285,53 @@ public class ExecutionContext<T> {
 		throw new SqlExecutionException("Unsupported execution type for sinks.");
 	}
 
+	private static TableEnvironment createStreamTableEnvironment(
+			StreamExecutionEnvironment env,
+			EnvironmentSettings settings,
+			Executor executor) {
+
+		final TableConfig config = TableConfig.getDefault();
+
+		final CatalogManager catalogManager = new CatalogManager(
+			settings.getBuiltInCatalogName(),
+			new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));
+
+		final FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);
+
+		final Map<String, String> plannerProperties = settings.toPlannerProperties();
+		final Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+			.create(plannerProperties, executor, config, functionCatalog, catalogManager);
+
+		return new StreamTableEnvironmentImpl(
+			catalogManager,
+			functionCatalog,
+			config,
+			env,
+			planner,
+			executor,
+			settings.isStreamingMode()
+		);
+	}
+
+	private static Executor lookupExecutor(
+			Map<String, String> executorProperties,
+			StreamExecutionEnvironment executionEnvironment) {
+		try {
+			ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
+			Method createMethod = executorFactory.getClass()
+				.getMethod("create", Map.class, StreamExecutionEnvironment.class);
+
+			return (Executor) createMethod.invoke(
+				executorFactory,
+				executorProperties,
+				executionEnvironment);
+		} catch (Exception e) {
+			throw new TableException(
+				"Could not instantiate the executor. Make sure a planner module is on the classpath",
+				e);
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	/**
@@ -283,17 +344,25 @@ public class ExecutionContext<T> {
 		private final QueryConfig queryConfig;
 		private final ExecutionEnvironment execEnv;
 		private final StreamExecutionEnvironment streamExecEnv;
+		private final Executor executor;
 		private final TableEnvironment tableEnv;
 
 		private EnvironmentInstance() {
+			// create settings
+			final EnvironmentSettings settings = mergedEnv.getExecution().getEnvironmentSettings();
+
 			// create environments
-			if (mergedEnv.getExecution().isStreamingExecution()) {
+			if (mergedEnv.getExecution().isStreamingPlanner()) {
 				streamExecEnv = createStreamExecutionEnvironment();
 				execEnv = null;
-				tableEnv = StreamTableEnvironment.create(streamExecEnv);
-			} else if (mergedEnv.getExecution().isBatchExecution()) {
+
+				final Map<String, String> executorProperties = settings.toExecutorProperties();
+				executor = lookupExecutor(executorProperties, streamExecEnv);
+				tableEnv = createStreamTableEnvironment(streamExecEnv, settings, executor);
+			} else if (mergedEnv.getExecution().isBatchPlanner()) {
 				streamExecEnv = null;
 				execEnv = createExecutionEnvironment();
+				executor = null;
 				tableEnv = BatchTableEnvironment.create(execEnv);
 			} else {
 				throw new SqlExecutionException("Unsupported execution type specified.");
@@ -378,6 +447,11 @@ public class ExecutionContext<T> {
 
 		private FlinkPlan createPlan(String name, Configuration flinkConfig) {
 			if (streamExecEnv != null) {
+				// special case for Blink planner to apply batch optimizations
+				// note: it also modifies the ExecutionConfig!
+				if (executor instanceof ExecutorBase) {
+					return ((ExecutorBase) executor).generateStreamGraph(name);
+				}
 				return streamExecEnv.getStreamGraph(name);
 			} else {
 				final int parallelism = execEnv.getParallelism();
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 101f72c..5d91964 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -40,7 +40,6 @@ import org.apache.flink.table.api.StreamQueryConfig;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.internal.TableEnvImpl;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.calcite.FlinkTypeFactory;
 import org.apache.flink.table.client.SqlClientException;
@@ -302,7 +301,7 @@ public class LocalExecutor implements Executor {
 
 		try {
 			return context.wrapClassLoader(() ->
-				Arrays.asList(((TableEnvImpl) tableEnv).getCompletionHints(statement, position)));
+				Arrays.asList(tableEnv.getCompletionHints(statement, position)));
 		} catch (Throwable t) {
 			// catch everything such that the query does not crash the executor
 			if (LOG.isDebugEnabled()) {
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
index dba7ed6..c3cc12b 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
@@ -61,7 +61,7 @@ public class ResultStore {
 
 		final RowTypeInfo outputType = new RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames());
 
-		if (env.getExecution().isStreamingExecution()) {
+		if (env.getExecution().inStreamingMode()) {
 			// determine gateway address (and port if possible)
 			final InetAddress gatewayAddress = getGatewayAddress(env.getDeployment());
 			final int gatewayPort = getGatewayPort(env.getDeployment());
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
index 4cddb53..2ad69e4 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
@@ -51,6 +51,7 @@ public class EnvironmentTest {
 	@Test
 	public void testMerging() throws Exception {
 		final Map<String, String> replaceVars1 = new HashMap<>();
+		replaceVars1.put("$VAR_PLANNER", "old");
 		replaceVars1.put("$VAR_EXECUTION_TYPE", "batch");
 		replaceVars1.put("$VAR_RESULT_MODE", "table");
 		replaceVars1.put("$VAR_UPDATE_MODE", "");
@@ -76,7 +77,7 @@ public class EnvironmentTest {
 		tables.add("TestView2");
 
 		assertEquals(tables, merged.getTables().keySet());
-		assertTrue(merged.getExecution().isStreamingExecution());
+		assertTrue(merged.getExecution().inStreamingMode());
 		assertEquals(16, merged.getExecution().getMaxParallelism());
 	}
 
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
index fb0c80c..aaa8321 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
@@ -247,6 +247,7 @@ public class ExecutionContextTest {
 
 	private <T> ExecutionContext<T> createDefaultExecutionContext() throws Exception {
 		final Map<String, String> replaceVars = new HashMap<>();
+		replaceVars.put("$VAR_PLANNER", "old");
 		replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
 		replaceVars.put("$VAR_RESULT_MODE", "changelog");
 		replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
@@ -256,6 +257,7 @@ public class ExecutionContextTest {
 
 	private <T> ExecutionContext<T> createCatalogExecutionContext() throws Exception {
 		final Map<String, String> replaceVars = new HashMap<>();
+		replaceVars.put("$VAR_PLANNER", "old");
 		replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
 		replaceVars.put("$VAR_RESULT_MODE", "changelog");
 		replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 5dbec41..838b95a 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.client.config.Environment;
+import org.apache.flink.table.client.config.entries.ExecutionEntry;
 import org.apache.flink.table.client.config.entries.ViewEntry;
 import org.apache.flink.table.client.gateway.Executor;
 import org.apache.flink.table.client.gateway.ProgramTargetDescriptor;
@@ -52,6 +53,10 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 
 import java.io.File;
 import java.io.IOException;
@@ -75,8 +80,16 @@ import static org.junit.Assert.fail;
 /**
  * Contains basic tests for the {@link LocalExecutor}.
  */
+@RunWith(Parameterized.class)
 public class LocalExecutorITCase extends TestLogger {
 
+	@Parameters(name = "Planner: {0}")
+	public static List<String> planner() {
+		return Arrays.asList(
+			ExecutionEntry.EXECUTION_PLANNER_VALUE_OLD,
+			ExecutionEntry.EXECUTION_PLANNER_VALUE_BLINK);
+	}
+
 	private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-client-defaults.yaml";
 	private static final String CATALOGS_ENVIRONMENT_FILE = "test-sql-client-catalogs.yaml";
 
@@ -110,6 +123,9 @@ public class LocalExecutorITCase extends TestLogger {
 		return config;
 	}
 
+	@Parameter
+	public String planner;
+
 	@Test
 	public void testValidateSession() throws Exception {
 		final Executor executor = createDefaultExecutor(clusterClient);
@@ -174,7 +190,7 @@ public class LocalExecutorITCase extends TestLogger {
 
 		final List<String> actualDatabases = executor.listDatabases(session);
 
-		final List<String> expectedDatabases = Arrays.asList("default_database");
+		final List<String> expectedDatabases = Collections.singletonList("default_database");
 		assertEquals(expectedDatabases, actualDatabases);
 	}
 
@@ -220,6 +236,7 @@ public class LocalExecutorITCase extends TestLogger {
 		final Map<String, String> actualProperties = executor.getSessionProperties(session);
 
 		final Map<String, String> expectedProperties = new HashMap<>();
+		expectedProperties.put("execution.planner", planner);
 		expectedProperties.put("execution.type", "batch");
 		expectedProperties.put("execution.time-characteristic", "event-time");
 		expectedProperties.put("execution.periodic-watermarks-interval", "99");
@@ -275,6 +292,7 @@ public class LocalExecutorITCase extends TestLogger {
 		final URL url = getClass().getClassLoader().getResource("test-data.csv");
 		Objects.requireNonNull(url);
 		final Map<String, String> replaceVars = new HashMap<>();
+		replaceVars.put("$VAR_PLANNER", planner);
 		replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
 		replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
 		replaceVars.put("$VAR_RESULT_MODE", "changelog");
@@ -315,6 +333,7 @@ public class LocalExecutorITCase extends TestLogger {
 		Objects.requireNonNull(url);
 
 		final Map<String, String> replaceVars = new HashMap<>();
+		replaceVars.put("$VAR_PLANNER", planner);
 		replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
 		replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
 		replaceVars.put("$VAR_RESULT_MODE", "table");
@@ -340,6 +359,7 @@ public class LocalExecutorITCase extends TestLogger {
 		Objects.requireNonNull(url);
 
 		final Map<String, String> replaceVars = new HashMap<>();
+		replaceVars.put("$VAR_PLANNER", planner);
 		replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
 		replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
 		replaceVars.put("$VAR_RESULT_MODE", "table");
@@ -359,6 +379,7 @@ public class LocalExecutorITCase extends TestLogger {
 		final URL url = getClass().getClassLoader().getResource("test-data.csv");
 		Objects.requireNonNull(url);
 		final Map<String, String> replaceVars = new HashMap<>();
+		replaceVars.put("$VAR_PLANNER", planner);
 		replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
 		replaceVars.put("$VAR_EXECUTION_TYPE", "batch");
 		replaceVars.put("$VAR_RESULT_MODE", "table");
@@ -395,6 +416,7 @@ public class LocalExecutorITCase extends TestLogger {
 		final URL url = getClass().getClassLoader().getResource("test-data.csv");
 		Objects.requireNonNull(url);
 		final Map<String, String> replaceVars = new HashMap<>();
+		replaceVars.put("$VAR_PLANNER", planner);
 		replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
 		replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
 		replaceVars.put("$VAR_SOURCE_SINK_PATH", csvOutputPath);
@@ -454,6 +476,7 @@ public class LocalExecutorITCase extends TestLogger {
 		final URL url = getClass().getClassLoader().getResource("test-data.csv");
 		Objects.requireNonNull(url);
 		final Map<String, String> replaceVars = new HashMap<>();
+		replaceVars.put("$VAR_PLANNER", planner);
 		replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
 		replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
 		replaceVars.put("$VAR_SOURCE_SINK_PATH", csvOutputPath);
@@ -519,6 +542,7 @@ public class LocalExecutorITCase extends TestLogger {
 
 	private <T> LocalExecutor createDefaultExecutor(ClusterClient<T> clusterClient) throws Exception {
 		final Map<String, String> replaceVars = new HashMap<>();
+		replaceVars.put("$VAR_PLANNER", planner);
 		replaceVars.put("$VAR_EXECUTION_TYPE", "batch");
 		replaceVars.put("$VAR_UPDATE_MODE", "");
 		replaceVars.put("$VAR_MAX_ROWS", "100");
diff --git a/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml b/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
index 9844d54..d3e917d 100644
--- a/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
+++ b/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
@@ -115,7 +115,15 @@ functions:
       - type: LONG
         value: 5
 
+catalogs:
+  - name: catalog1
+    type: DependencyTest
+  - name: simple-catalog
+    type: simple-catalog
+    test-table: test-table
+
 execution:
+  planner: "$VAR_PLANNER"
   type: "$VAR_EXECUTION_TYPE"
   time-characteristic: event-time
   periodic-watermarks-interval: 99
@@ -133,10 +141,3 @@ execution:
 
 deployment:
   response-timeout: 5000
-
-catalogs:
-  - name: catalog1
-    type: DependencyTest
-  - name: simple-catalog
-    type: simple-catalog
-    test-table: test-table
diff --git a/tools/travis/splits/split_misc.sh b/tools/travis/splits/split_misc.sh
index 35c8465..60eb91b 100755
--- a/tools/travis/splits/split_misc.sh
+++ b/tools/travis/splits/split_misc.sh
@@ -67,7 +67,8 @@ run_test "Avro Confluent Schema Registry nightly end-to-end test" "$END_TO_END_D
 run_test "State TTL Heap backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh file"
 run_test "State TTL RocksDb backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh rocks"
 
-run_test "SQL Client end-to-end test" "$END_TO_END_DIR/test-scripts/test_sql_client.sh"
+run_test "SQL Client end-to-end test (Old planner)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old"
+run_test "SQL Client end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink"
 run_test "SQL Client end-to-end test for Kafka 0.10" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka010.sh"
 run_test "SQL Client end-to-end test for Kafka 0.11" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka011.sh"
 run_test "SQL Client end-to-end test for modern Kafka" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka.sh"


[flink] 04/06: [hotfix][table-planner-blink] Update Blink's ExpressionReducer classloading to Flink's behavior

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4bb72d0396051a422b628b22242256f28574e36e
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jul 29 11:44:49 2019 +0200

    [hotfix][table-planner-blink] Update Blink's ExpressionReducer classloading to Flink's behavior
---
 .../org/apache/flink/table/planner/codegen/ExpressionReducer.scala      | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
index 57b6fa0..bb5ff90 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
@@ -96,7 +96,7 @@ class ExpressionReducer(
       resultType,
       EMPTY_ROW_TYPE)
 
-    val function = generatedFunction.newInstance(getClass.getClassLoader)
+    val function = generatedFunction.newInstance(Thread.currentThread().getContextClassLoader)
     val richMapFunction = function match {
       case r: RichMapFunction[GenericRow, GenericRow] => r
       case _ => throw new TableException("RichMapFunction[GenericRow, GenericRow] required here")


[flink] 01/06: [table-api-java] Postpone check for Blink planner in StreamTableEnvironment

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f1a2312f62f701cfd0a8f29d43e3e96f054c254d
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jul 29 11:40:35 2019 +0200

    [table-api-java] Postpone check for Blink planner in StreamTableEnvironment
---
 .../table/api/java/internal/StreamTableEnvironmentImpl.java  | 12 +++++-------
 1 file changed, 5 insertions(+), 7 deletions(-)

diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
index 2e35bb9..8987940 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.api.java.internal;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.dag.Transformation;
@@ -79,7 +78,6 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 
 	private final StreamExecutionEnvironment executionEnvironment;
 
-	@VisibleForTesting
 	public StreamTableEnvironmentImpl(
 			CatalogManager catalogManager,
 			FunctionCatalog functionCatalog,
@@ -90,11 +88,6 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 			boolean isStreamingMode) {
 		super(catalogManager, tableConfig, executor, functionCatalog, planner, isStreamingMode);
 		this.executionEnvironment = executionEnvironment;
-
-		if (!isStreamingMode) {
-			throw new TableException(
-				"StreamTableEnvironment is not supported in batch mode now, please use TableEnvironment.");
-		}
 	}
 
 	public static StreamTableEnvironment create(
@@ -102,6 +95,11 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 			EnvironmentSettings settings,
 			TableConfig tableConfig) {
 
+		if (!settings.isStreamingMode()) {
+			throw new TableException(
+				"StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment.");
+		}
+
 		CatalogManager catalogManager = new CatalogManager(
 			settings.getBuiltInCatalogName(),
 			new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));