You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2019/08/01 14:52:35 UTC

[flink] branch release-1.9 updated: [FLINK-12704][python] Enable the configuration of using blink planner.

This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new e5a9904  [FLINK-12704][python] Enable the configuration of using blink planner.
e5a9904 is described below

commit e5a99049daf9cc53b2e6957157c5901448d1ad46
Author: Wei Zhong <we...@gmail.com>
AuthorDate: Thu Aug 1 17:15:29 2019 +0800

    [FLINK-12704][python] Enable the configuration of using blink planner.
    
    This closes #9314
---
 flink-python/pyflink/table/__init__.py             |   4 +
 flink-python/pyflink/table/environment_settings.py | 199 +++++++++++++++++++++
 flink-python/pyflink/table/table_environment.py    | 141 ++++++++++++---
 .../table/tests/test_environment_settings.py       | 133 ++++++++++++++
 .../test_environment_settings_completeness.py      |  67 +++++++
 .../table/tests/test_table_environment_api.py      |  92 +++++++++-
 flink-python/pyflink/testing/test_case_utils.py    |  19 ++
 7 files changed, 626 insertions(+), 29 deletions(-)

diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py
index 48a150e..e69a9b7 100644
--- a/flink-python/pyflink/table/__init__.py
+++ b/flink-python/pyflink/table/__init__.py
@@ -27,6 +27,8 @@ Important classes of Flink Table API:
     - :class:`pyflink.table.TableConfig`
       A config to define the runtime behavior of the Table API.
       It is necessary when creating :class:`TableEnvironment`.
+    - :class:`pyflink.table.EnvironmentSettings`
+      Defines all parameters that initialize a table environment.
     - :class:`pyflink.table.StreamQueryConfig` and :class:`pyflink.table.BatchQueryConfig`
       A query config holds parameters to configure the behavior of queries.
     - :class:`pyflink.table.TableSource`
@@ -53,6 +55,7 @@ Important classes of Flink Table API:
 """
 from __future__ import absolute_import
 
+from pyflink.table.environment_settings import EnvironmentSettings
 from pyflink.table.table import Table, GroupedTable, GroupWindowedTable, OverWindowedTable, \
     WindowGroupedTable
 from pyflink.table.table_config import TableConfig
@@ -67,6 +70,7 @@ __all__ = [
     'TableEnvironment',
     'StreamTableEnvironment',
     'BatchTableEnvironment',
+    'EnvironmentSettings',
     'Table',
     'GroupedTable',
     'GroupWindowedTable',
diff --git a/flink-python/pyflink/table/environment_settings.py b/flink-python/pyflink/table/environment_settings.py
new file mode 100644
index 0000000..d6fda40
--- /dev/null
+++ b/flink-python/pyflink/table/environment_settings.py
@@ -0,0 +1,199 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.java_gateway import get_gateway
+
+__all__ = ['EnvironmentSettings']
+
+
+class EnvironmentSettings(object):
+    """
+    Defines all parameters that initialize a table environment. Those parameters are used only
+    during instantiation of a :class:`~pyflink.table.TableEnvironment` and cannot be changed
+    afterwards.
+
+    Example:
+    ::
+
+        >>> EnvironmentSettings.new_instance() \\
+        ...     .use_old_planner() \\
+        ...     .in_streaming_mode() \\
+        ...     .with_built_in_catalog_name("my_catalog") \\
+        ...     .with_built_in_database_name("my_database") \\
+        ...     .build()
+    """
+
+    class Builder(object):
+        """
+        A builder for :class:`EnvironmentSettings`.
+        """
+
+        def __init__(self):
+            gateway = get_gateway()
+            self._j_builder = gateway.jvm.EnvironmentSettings.Builder()
+
+        def use_old_planner(self):
+            """
+            Sets the old Flink planner as the required module.
+
+            This is the default behavior.
+
+            :return: This object.
+            :rtype: EnvironmentSettings.Builder
+            """
+            self._j_builder = self._j_builder.useOldPlanner()
+            return self
+
+        def use_blink_planner(self):
+            """
+            Sets the Blink planner as the required module. By default, :func:`use_old_planner` is
+            enabled.
+
+            :return: This object.
+            :rtype: EnvironmentSettings.Builder
+            """
+            self._j_builder = self._j_builder.useBlinkPlanner()
+            return self
+
+        def use_any_planner(self):
+            """
+            Does not set a planner requirement explicitly.
+
+            A planner will be discovered automatically, if there is only one planner available.
+
+            By default, :func:`use_old_planner` is enabled.
+
+            :return: This object.
+            :rtype: EnvironmentSettings.Builder
+            """
+            self._j_builder = self._j_builder.useAnyPlanner()
+            return self
+
+        def in_batch_mode(self):
+            """
+            Sets that the components should work in a batch mode. Streaming mode by default.
+
+            :return: This object.
+            :rtype: EnvironmentSettings.Builder
+            """
+            self._j_builder = self._j_builder.inBatchMode()
+            return self
+
+        def in_streaming_mode(self):
+            """
+            Sets that the components should work in a streaming mode. Enabled by default.
+
+            :return: This object.
+            :rtype: EnvironmentSettings.Builder
+            """
+            self._j_builder = self._j_builder.inStreamingMode()
+            return self
+
+        def with_built_in_catalog_name(self, built_in_catalog_name):
+            """
+            Specifies the name of the initial catalog to be created when instantiating
+            a :class:`~pyflink.table.TableEnvironment`. This catalog will be used to store all
+            non-serializable objects such as tables and functions registered via e.g.
+            :func:`~pyflink.table.TableEnvironment.register_table_sink` or
+            :func:`~pyflink.table.TableEnvironment.register_java_function`. It will also be the
+            initial value for the current catalog which can be altered via
+            :func:`~pyflink.table.TableEnvironment.use_catalog`.
+
+            Default: "default_catalog".
+
+            :param built_in_catalog_name: The specified built-in catalog name.
+            :type built_in_catalog_name: str
+            :return: This object.
+            :rtype: EnvironmentSettings.Builder
+            """
+            self._j_builder = self._j_builder.withBuiltInCatalogName(built_in_catalog_name)
+            return self
+
+        def with_built_in_database_name(self, built_in_database_name):
+            """
+            Specifies the name of the default database in the initial catalog to be
+            created when instantiating a :class:`~pyflink.table.TableEnvironment`. The database
+            will be used to store all non-serializable objects such as tables and functions
+            registered via e.g. :func:`~pyflink.table.TableEnvironment.register_table_sink` or
+            :func:`~pyflink.table.TableEnvironment.register_java_function`. It will also be the
+            initial value for the current database which can be altered via
+            :func:`~pyflink.table.TableEnvironment.use_database`.
+
+            Default: "default_database".
+
+            :param built_in_database_name: The specified built-in database name.
+            :type built_in_database_name: str
+            :return: This object.
+            :rtype: EnvironmentSettings.Builder
+            """
+            self._j_builder = self._j_builder.withBuiltInDatabaseName(built_in_database_name)
+            return self
+
+        def build(self):
+            """
+            Returns an immutable instance of EnvironmentSettings.
+
+            :return: an immutable instance of EnvironmentSettings.
+            :rtype: EnvironmentSettings
+            """
+            return EnvironmentSettings(self._j_builder.build())
+
+    def __init__(self, j_environment_settings):
+        self._j_environment_settings = j_environment_settings
+
+    def get_built_in_catalog_name(self):
+        """
+        Gets the specified name of the initial catalog to be created when instantiating a
+        :class:`~pyflink.table.TableEnvironment`.
+
+        :return: The specified name of the initial catalog to be created.
+        :rtype: str
+        """
+        return self._j_environment_settings.getBuiltInCatalogName()
+
+    def get_built_in_database_name(self):
+        """
+        Gets the specified name of the default database in the initial catalog to be created when
+        instantiating a :class:`~pyflink.table.TableEnvironment`.
+
+        :return: The specified name of the default database in the initial catalog to be created.
+        :rtype: str
+        """
+        return self._j_environment_settings.getBuiltInDatabaseName()
+
+    def is_streaming_mode(self):
+        """
+        Tells if the :class:`~pyflink.table.TableEnvironment` should work in a batch or streaming
+        mode.
+
+        :return: True if the TableEnvironment should work in a streaming mode, false otherwise.
+        :rtype: bool
+        """
+        return self._j_environment_settings.isStreamingMode()
+
+    @staticmethod
+    def new_instance():
+        """
+        Creates a builder for creating an instance of EnvironmentSettings.
+
+        By default, it does not specify a required planner and will use the one that is available
+        on the classpath via discovery.
+
+        :return: A builder of EnvironmentSettings.
+        :rtype: EnvironmentSettings.Builder
+        """
+        return EnvironmentSettings.Builder()
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 2c23deb..b7ad5b5 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -19,6 +19,8 @@ import os
 import tempfile
 from abc import ABCMeta, abstractmethod
 
+from py4j.java_gateway import get_java_class
+
 from pyflink.serializers import BatchedSerializer, PickleSerializer
 from pyflink.table.catalog import Catalog
 from pyflink.table.table_config import TableConfig
@@ -730,32 +732,58 @@ class StreamTableEnvironment(TableEnvironment):
             self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
 
     @staticmethod
-    def create(stream_execution_environment, table_config=None):
+    def create(stream_execution_environment, table_config=None, environment_settings=None):
         """
-        Creates a :class:`TableEnvironment` for a :class:`StreamExecutionEnvironment`
+        Creates a :class:`TableEnvironment` for a
+        :class:`~pyflink.datastream.StreamExecutionEnvironment`.
 
         Example:
         ::
 
             >>> env = StreamExecutionEnvironment.get_execution_environment()
-            # create without TableConfig
+            # create without optional parameters.
             >>> table_env = StreamTableEnvironment.create(env)
             # create with TableConfig
             >>> table_config = TableConfig()
             >>> table_config.set_null_check(False)
             >>> table_env = StreamTableEnvironment.create(env, table_config)
+            # create with EnvrionmentSettings
+            >>> environment_settings = EnvironmentSettings.new_instance().use_blink_planner() \\
+            ...     .build()
+            >>> table_env = StreamTableEnvironment.create(
+            ...     env, environment_settings=environment_settings)
+
 
-        :param stream_execution_environment: The :class:`StreamExecutionEnvironment` of the
-                                             TableEnvironment.
+        :param stream_execution_environment: The
+                                             :class:`~pyflink.datastream.StreamExecutionEnvironment`
+                                             of the TableEnvironment.
+        :type stream_execution_environment: pyflink.datastream.StreamExecutionEnvironment
         :param table_config: The configuration of the TableEnvironment, optional.
+        :type table_config: TableConfig
+        :param environment_settings: The environment settings used to instantiate the
+                                     TableEnvironment. It provides the interfaces about planner
+                                     selection(flink or blink), optional.
+        :type environment_settings: pyflink.table.EnvironmentSettings
         :return: The :class:`StreamTableEnvironment` created from given StreamExecutionEnvironment
                  and configuration.
+        :rtype: StreamTableEnvironment
         """
+        if table_config is not None and environment_settings is not None:
+            raise ValueError("The param 'table_config' and "
+                             "'environment_settings' cannot be used at the same time")
+
         gateway = get_gateway()
         if table_config is not None:
             j_tenv = gateway.jvm.StreamTableEnvironment.create(
                 stream_execution_environment._j_stream_execution_environment,
                 table_config._j_table_config)
+        elif environment_settings is not None:
+            if not environment_settings.is_streaming_mode():
+                raise ValueError("The environment settings for StreamTableEnvironment must be "
+                                 "set to streaming mode.")
+            j_tenv = gateway.jvm.StreamTableEnvironment.create(
+                stream_execution_environment._j_stream_execution_environment,
+                environment_settings._j_environment_settings)
         else:
             j_tenv = gateway.jvm.StreamTableEnvironment.create(
                 stream_execution_environment._j_stream_execution_environment)
@@ -770,10 +798,16 @@ class BatchTableEnvironment(TableEnvironment):
 
     def _from_file(self, filename, schema):
         gateway = get_gateway()
-        jds = gateway.jvm.PythonBridgeUtils.createDataSetFromFile(
-            self._j_tenv.execEnv(), filename, True)
-        return Table(gateway.jvm.PythonTableUtils.fromDataSet(
-            self._j_tenv, jds, _to_java_type(schema)))
+        blink_t_env_class = get_java_class(
+            gateway.jvm.org.apache.flink.table.api.internal.TableEnvironmentImpl)
+        if blink_t_env_class == self._j_tenv.getClass():
+            raise NotImplementedError("The operation 'from_elements' in batch mode is currently "
+                                      "not supported when using blink planner.")
+        else:
+            jds = gateway.jvm.PythonBridgeUtils.createDataSetFromFile(
+                self._j_tenv.execEnv(), filename, True)
+            return Table(gateway.jvm.PythonTableUtils.fromDataSet(
+                self._j_tenv, jds, _to_java_type(schema)))
 
     def get_config(self):
         """
@@ -810,38 +844,89 @@ class BatchTableEnvironment(TableEnvironment):
             ...     .register_table_source("MyTable")
 
         :param connector_descriptor: Connector descriptor describing the external system.
-        :return: A :class:`BatchTableDescriptor` used to build the table source/sink.
+        :type connector_descriptor: ConnectorDescriptor
+        :return: A :class:`BatchTableDescriptor` or a :class:`StreamTableDescriptor`
+                 (for blink planner) used to build the table source/sink.
+        :rtype: BatchTableDescriptor or StreamTableDescriptor
         """
-        # type: (ConnectorDescriptor) -> BatchTableDescriptor
-        return BatchTableDescriptor(
-            self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
+        gateway = get_gateway()
+        blink_t_env_class = get_java_class(
+            gateway.jvm.org.apache.flink.table.api.internal.TableEnvironmentImpl)
+        if blink_t_env_class == self._j_tenv.getClass():
+            return StreamTableDescriptor(
+                self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
+        else:
+            return BatchTableDescriptor(
+                self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
 
     @staticmethod
-    def create(execution_environment, table_config=None):
+    def create(execution_environment=None, table_config=None, environment_settings=None):
         """
-        Creates a :class:`TableEnvironment` for a batch :class:`ExecutionEnvironment`.
+        Creates a :class:`BatchTableEnvironment`.
 
         Example:
         ::
 
+            # create with ExecutionEnvironment.
             >>> env = ExecutionEnvironment.get_execution_environment()
             >>> table_env = BatchTableEnvironment.create(env)
+            # create with ExecutionEnvironment and TableConfig.
             >>> table_config = TableConfig()
             >>> table_config.set_null_check(False)
             >>> table_env = BatchTableEnvironment.create(env, table_config)
-
-        :param execution_environment: The batch :class:`ExecutionEnvironment` of the
-                                      TableEnvironment.
+            # create with EnvironmentSettings.
+            >>> environment_settings = EnvironmentSettings.new_instance().in_batch_mode() \\
+            ...     .use_blink_planner().build()
+            >>> table_env = BatchTableEnvironment.create(environment_settings=environment_settings)
+
+        :param execution_environment: The batch :class:`pyflink.dataset.ExecutionEnvironment` of
+                                      the TableEnvironment.
+        :type execution_environment: pyflink.dataset.ExecutionEnvironment
         :param table_config: The configuration of the TableEnvironment, optional.
-        :return: The :class:`BatchTableEnvironment` created from given ExecutionEnvironment and
+        :type table_config: TableConfig
+        :param environment_settings: The environment settings used to instantiate the
+                                     TableEnvironment. It provides the interfaces about planner
+                                     selection(flink or blink), optional.
+        :type environment_settings: pyflink.table.EnvironmentSettings
+        :return: The BatchTableEnvironment created from given ExecutionEnvironment and
                  configuration.
-        """
+        :rtype: BatchTableEnvironment
+        """
+        if execution_environment is None and \
+                table_config is None and \
+                environment_settings is None:
+            raise ValueError("No argument found, the param 'execution_environment' "
+                             "or 'environment_settings' is required.")
+        elif execution_environment is None and \
+                table_config is not None and \
+                environment_settings is None:
+            raise ValueError("Only the param 'table_config' is found, "
+                             "the param 'execution_environment' is also required.")
+        elif execution_environment is not None and \
+                environment_settings is not None:
+            raise ValueError("The param 'execution_environment' and "
+                             "'environment_settings' cannot be used at the same time")
+        elif table_config is not None and \
+                environment_settings is not None:
+            raise ValueError("The param 'table_config' and "
+                             "'environment_settings' cannot be used at the same time")
+
         gateway = get_gateway()
-        if table_config is not None:
-            j_tenv = gateway.jvm.BatchTableEnvironment.create(
-                execution_environment._j_execution_environment,
-                table_config._j_table_config)
-        else:
-            j_tenv = gateway.jvm.BatchTableEnvironment.create(
-                execution_environment._j_execution_environment)
-        return BatchTableEnvironment(j_tenv)
+        if execution_environment is not None and environment_settings is None:
+            if table_config is not None:
+                j_tenv = gateway.jvm.BatchTableEnvironment.create(
+                    execution_environment._j_execution_environment,
+                    table_config._j_table_config)
+            else:
+                j_tenv = gateway.jvm.BatchTableEnvironment.create(
+                    execution_environment._j_execution_environment)
+            return BatchTableEnvironment(j_tenv)
+        elif environment_settings is not None and \
+                execution_environment is None and \
+                table_config is None:
+            if environment_settings.is_streaming_mode():
+                raise ValueError("The environment settings for BatchTableEnvironment must be "
+                                 "set to batch mode.")
+            j_tenv = gateway.jvm.TableEnvironment.create(
+                environment_settings._j_environment_settings)
+            return BatchTableEnvironment(j_tenv)
diff --git a/flink-python/pyflink/table/tests/test_environment_settings.py b/flink-python/pyflink/table/tests/test_environment_settings.py
new file mode 100644
index 0000000..e493107
--- /dev/null
+++ b/flink-python/pyflink/table/tests/test_environment_settings.py
@@ -0,0 +1,133 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.java_gateway import get_gateway
+
+from pyflink.table import EnvironmentSettings
+from pyflink.testing.test_case_utils import PyFlinkTestCase, get_private_field
+
+
+class EnvironmentSettingsTests(PyFlinkTestCase):
+
+    def test_planner_selection(self):
+
+        gateway = get_gateway()
+
+        CLASS_NAME = gateway.jvm.EnvironmentSettings.CLASS_NAME
+
+        builder = EnvironmentSettings.new_instance()
+
+        OLD_PLANNER_FACTORY = get_private_field(builder._j_builder, "OLD_PLANNER_FACTORY")
+        OLD_EXECUTOR_FACTORY = get_private_field(builder._j_builder, "OLD_EXECUTOR_FACTORY")
+        BLINK_PLANNER_FACTORY = get_private_field(builder._j_builder, "BLINK_PLANNER_FACTORY")
+        BLINK_EXECUTOR_FACTORY = get_private_field(builder._j_builder, "BLINK_EXECUTOR_FACTORY")
+
+        # test the default behaviour to make sure it is consistent with the python doc
+        envrionment_settings = builder.build()
+
+        self.assertEqual(
+            envrionment_settings._j_environment_settings.toPlannerProperties()[CLASS_NAME],
+            OLD_PLANNER_FACTORY)
+
+        self.assertEqual(
+            envrionment_settings._j_environment_settings.toExecutorProperties()[CLASS_NAME],
+            OLD_EXECUTOR_FACTORY)
+
+        # test use_old_planner
+        envrionment_settings = builder.use_old_planner().build()
+
+        self.assertEqual(
+            envrionment_settings._j_environment_settings.toPlannerProperties()[CLASS_NAME],
+            OLD_PLANNER_FACTORY)
+
+        self.assertEqual(
+            envrionment_settings._j_environment_settings.toExecutorProperties()[CLASS_NAME],
+            OLD_EXECUTOR_FACTORY)
+
+        # test use_blink_planner
+        envrionment_settings = builder.use_blink_planner().build()
+
+        self.assertEqual(
+            envrionment_settings._j_environment_settings.toPlannerProperties()[CLASS_NAME],
+            BLINK_PLANNER_FACTORY)
+
+        self.assertEqual(
+            envrionment_settings._j_environment_settings.toExecutorProperties()[CLASS_NAME],
+            BLINK_EXECUTOR_FACTORY)
+
+        # test use_any_planner
+        envrionment_settings = builder.use_any_planner().build()
+
+        self.assertTrue(
+            CLASS_NAME not in envrionment_settings._j_environment_settings.toPlannerProperties())
+
+        self.assertTrue(
+            CLASS_NAME not in envrionment_settings._j_environment_settings.toExecutorProperties())
+
+    def test_mode_selection(self):
+
+        builder = EnvironmentSettings.new_instance()
+
+        # test the default behaviour to make sure it is consistent with the python doc
+        envrionment_settings = builder.build()
+
+        self.assertTrue(envrionment_settings.is_streaming_mode())
+
+        # test in_streaming_mode
+        envrionment_settings = builder.in_streaming_mode().build()
+
+        self.assertTrue(envrionment_settings.is_streaming_mode())
+
+        # test in_batch_mode
+        envrionment_settings = builder.in_batch_mode().build()
+
+        self.assertFalse(envrionment_settings.is_streaming_mode())
+
+    def test_with_built_in_catalog_name(self):
+
+        gateway = get_gateway()
+
+        DEFAULT_BUILTIN_CATALOG = gateway.jvm.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG
+
+        builder = EnvironmentSettings.new_instance()
+
+        # test the default behaviour to make sure it is consistent with the python doc
+        envrionment_settings = builder.build()
+
+        self.assertEqual(envrionment_settings.get_built_in_catalog_name(), DEFAULT_BUILTIN_CATALOG)
+
+        envrionment_settings = builder.with_built_in_catalog_name("my_catalog").build()
+
+        self.assertEqual(envrionment_settings.get_built_in_catalog_name(), "my_catalog")
+
+    def test_with_built_in_database_name(self):
+
+        gateway = get_gateway()
+
+        DEFAULT_BUILTIN_DATABASE = gateway.jvm.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE
+
+        builder = EnvironmentSettings.new_instance()
+
+        # test the default behaviour to make sure it is consistent with the python doc
+        envrionment_settings = builder.build()
+
+        self.assertEqual(envrionment_settings.get_built_in_database_name(),
+                         DEFAULT_BUILTIN_DATABASE)
+
+        envrionment_settings = builder.with_built_in_database_name("my_database").build()
+
+        self.assertEqual(envrionment_settings.get_built_in_database_name(), "my_database")
diff --git a/flink-python/pyflink/table/tests/test_environment_settings_completeness.py b/flink-python/pyflink/table/tests/test_environment_settings_completeness.py
new file mode 100644
index 0000000..f32e813
--- /dev/null
+++ b/flink-python/pyflink/table/tests/test_environment_settings_completeness.py
@@ -0,0 +1,67 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import unittest
+
+from pyflink.table import EnvironmentSettings
+from pyflink.testing.test_case_utils import PythonAPICompletenessTestCase
+
+
+class EnvironmentSettingsCompletenessTests(PythonAPICompletenessTestCase, unittest.TestCase):
+    """
+    Tests whether the Python :class:`EnvironmentSettings` is consistent with
+    Java `org.apache.flink.table.api.EnvironmentSettings`.
+    """
+
+    @classmethod
+    def python_class(cls):
+        return EnvironmentSettings
+
+    @classmethod
+    def java_class(cls):
+        return "org.apache.flink.table.api.EnvironmentSettings"
+
+    @classmethod
+    def excluded_methods(cls):
+        # internal interfaces, no need to expose to users.
+        return {'toPlannerProperties', 'toExecutorProperties'}
+
+
+class EnvironmentSettingsBuilderCompletenessTests(PythonAPICompletenessTestCase, unittest.TestCase):
+    """
+    Tests whether the Python :class:`EnvironmentSettings.Builder` is consistent with
+    Java `org.apache.flink.table.api.EnvironmentSettings$Builder`.
+    """
+
+    @classmethod
+    def python_class(cls):
+        return EnvironmentSettings.Builder
+
+    @classmethod
+    def java_class(cls):
+        return "org.apache.flink.table.api.EnvironmentSettings$Builder"
+
+
+if __name__ == '__main__':
+    import unittest
+
+    try:
+        import xmlrunner
+        testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 3e2dd6f..8bbd491 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -21,7 +21,7 @@ from py4j.compat import unicode
 
 from pyflink.dataset import ExecutionEnvironment
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.table import DataTypes, CsvTableSink, StreamTableEnvironment
+from pyflink.table import DataTypes, CsvTableSink, StreamTableEnvironment, EnvironmentSettings
 from pyflink.table.table_config import TableConfig
 from pyflink.table.table_environment import BatchTableEnvironment
 from pyflink.table.types import RowType
@@ -197,6 +197,49 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
         self.assertEqual(readed_table_config.get_max_generated_code_length(), 32000)
         self.assertEqual(readed_table_config.get_local_timezone(), "Asia/Shanghai")
 
+    def test_create_table_environment_with_blink_planner(self):
+        t_env = StreamTableEnvironment.create(
+            self.env,
+            environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
+
+        planner = t_env._j_tenv.getPlanner()
+
+        self.assertEqual(
+            planner.getClass().getName(),
+            "org.apache.flink.table.planner.delegation.StreamPlanner")
+
+    def test_table_environment_with_blink_planner(self):
+        t_env = StreamTableEnvironment.create(
+            self.env,
+            environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
+
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        sink_path = os.path.join(self.tempdir + '/result.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING()]
+        data = [(1, 'hi', 'hello'), (2, 'hello', 'hello')]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+
+        t_env.register_table_source("source", csv_source)
+
+        t_env.register_table_sink(
+            "sink",
+            CsvTableSink(field_names, field_types, sink_path))
+        source = t_env.scan("source")
+
+        result = source.alias("a, b, c").select("1 + a, b, c")
+
+        result.insert_into("sink")
+
+        t_env.execute("blink_test")
+
+        results = []
+        with open(sink_path, 'r') as f:
+            results.append(f.readline())
+            results.append(f.readline())
+
+        self.assert_equals(results, ['2,hi,hello\n', '3,hello,hello\n'])
+
 
 class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase):
 
@@ -273,3 +316,50 @@ class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase):
         self.assertFalse(readed_table_config.get_null_check())
         self.assertEqual(readed_table_config.get_max_generated_code_length(), 32000)
         self.assertEqual(readed_table_config.get_local_timezone(), "Asia/Shanghai")
+
+    def test_create_table_environment_with_blink_planner(self):
+        t_env = BatchTableEnvironment.create(
+            environment_settings=EnvironmentSettings.new_instance().in_batch_mode()
+            .use_blink_planner().build())
+
+        planner = t_env._j_tenv.getPlanner()
+
+        self.assertEqual(
+            planner.getClass().getName(),
+            "org.apache.flink.table.planner.delegation.BatchPlanner")
+
+    def test_table_environment_with_blink_planner(self):
+        t_env = BatchTableEnvironment.create(
+            environment_settings=EnvironmentSettings.new_instance().in_batch_mode()
+            .use_blink_planner().build())
+
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        sink_path = os.path.join(self.tempdir + '/results')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING()]
+        data = [(1, 'hi', 'hello'), (2, 'hello', 'hello')]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+
+        t_env.register_table_source("source", csv_source)
+
+        t_env.register_table_sink(
+            "sink",
+            CsvTableSink(field_names, field_types, sink_path))
+        source = t_env.scan("source")
+
+        result = source.alias("a, b, c").select("1 + a, b, c")
+
+        result.insert_into("sink")
+
+        t_env.execute("blink_test")
+
+        results = []
+        for root, dirs, files in os.walk(sink_path):
+            for sub_file in files:
+                with open(os.path.join(root, sub_file), 'r') as f:
+                    line = f.readline()
+                    while line is not None and line != '':
+                        results.append(line)
+                        line = f.readline()
+
+        self.assert_equals(results, ['2,hi,hello\n', '3,hello,hello\n'])
diff --git a/flink-python/pyflink/testing/test_case_utils.py b/flink-python/pyflink/testing/test_case_utils.py
index cda1e65..c1d484e 100644
--- a/flink-python/pyflink/testing/test_case_utils.py
+++ b/flink-python/pyflink/testing/test_case_utils.py
@@ -25,6 +25,8 @@ import unittest
 from abc import abstractmethod
 
 from py4j.java_gateway import JavaObject
+from py4j.protocol import Py4JJavaError
+
 from pyflink.table.sources import CsvTableSource
 
 from pyflink.dataset import ExecutionEnvironment
@@ -48,6 +50,23 @@ logging.basicConfig(stream=sys.stdout, level=log_level,
                     format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
 
 
+def get_private_field(java_obj, field_name):
+    try:
+        field = java_obj.getClass().getDeclaredField(field_name)
+        field.setAccessible(True)
+        return field.get(java_obj)
+    except Py4JJavaError:
+        cls = java_obj.getClass()
+        while cls.getSuperclass() is not None:
+            cls = cls.getSuperclass()
+            try:
+                field = cls.getDeclaredField(field_name)
+                field.setAccessible(True)
+                return field.get(java_obj)
+            except Py4JJavaError:
+                pass
+
+
 class PyFlinkTestCase(unittest.TestCase):
     """
     Base class for unit tests.