You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2019/08/01 14:51:37 UTC
[flink] branch master updated: [FLINK-12704][python] Enable the
configuration of using blink planner.
This is an automated email from the ASF dual-hosted git repository.
jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new da3eb2e [FLINK-12704][python] Enable the configuration of using blink planner.
da3eb2e is described below
commit da3eb2e07e7c9a2aeda2c3bef803624349ae709a
Author: Wei Zhong <we...@gmail.com>
AuthorDate: Thu Aug 1 17:15:29 2019 +0800
[FLINK-12704][python] Enable the configuration of using blink planner.
This closes #9314
---
flink-python/pyflink/table/__init__.py | 4 +
flink-python/pyflink/table/environment_settings.py | 199 +++++++++++++++++++++
flink-python/pyflink/table/table_environment.py | 141 ++++++++++++---
.../table/tests/test_environment_settings.py | 133 ++++++++++++++
.../test_environment_settings_completeness.py | 67 +++++++
.../table/tests/test_table_environment_api.py | 92 +++++++++-
flink-python/pyflink/testing/test_case_utils.py | 19 ++
7 files changed, 626 insertions(+), 29 deletions(-)
diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py
index 48a150e..e69a9b7 100644
--- a/flink-python/pyflink/table/__init__.py
+++ b/flink-python/pyflink/table/__init__.py
@@ -27,6 +27,8 @@ Important classes of Flink Table API:
- :class:`pyflink.table.TableConfig`
A config to define the runtime behavior of the Table API.
It is necessary when creating :class:`TableEnvironment`.
+ - :class:`pyflink.table.EnvironmentSettings`
+ Defines all parameters that initialize a table environment.
- :class:`pyflink.table.StreamQueryConfig` and :class:`pyflink.table.BatchQueryConfig`
A query config holds parameters to configure the behavior of queries.
- :class:`pyflink.table.TableSource`
@@ -53,6 +55,7 @@ Important classes of Flink Table API:
"""
from __future__ import absolute_import
+from pyflink.table.environment_settings import EnvironmentSettings
from pyflink.table.table import Table, GroupedTable, GroupWindowedTable, OverWindowedTable, \
WindowGroupedTable
from pyflink.table.table_config import TableConfig
@@ -67,6 +70,7 @@ __all__ = [
'TableEnvironment',
'StreamTableEnvironment',
'BatchTableEnvironment',
+ 'EnvironmentSettings',
'Table',
'GroupedTable',
'GroupWindowedTable',
diff --git a/flink-python/pyflink/table/environment_settings.py b/flink-python/pyflink/table/environment_settings.py
new file mode 100644
index 0000000..d6fda40
--- /dev/null
+++ b/flink-python/pyflink/table/environment_settings.py
@@ -0,0 +1,199 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.java_gateway import get_gateway
+
+__all__ = ['EnvironmentSettings']
+
+
+class EnvironmentSettings(object):
+ """
+ Defines all parameters that initialize a table environment. Those parameters are used only
+ during instantiation of a :class:`~pyflink.table.TableEnvironment` and cannot be changed
+ afterwards.
+
+ Example:
+ ::
+
+ >>> EnvironmentSettings.new_instance() \\
+ ... .use_old_planner() \\
+ ... .in_streaming_mode() \\
+ ... .with_built_in_catalog_name("my_catalog") \\
+ ... .with_built_in_database_name("my_database") \\
+ ... .build()
+ """
+
+ class Builder(object):
+ """
+ A builder for :class:`EnvironmentSettings`.
+ """
+
+ def __init__(self):
+ gateway = get_gateway()
+ self._j_builder = gateway.jvm.EnvironmentSettings.Builder()
+
+ def use_old_planner(self):
+ """
+ Sets the old Flink planner as the required module.
+
+ This is the default behavior.
+
+ :return: This object.
+ :rtype: EnvironmentSettings.Builder
+ """
+ self._j_builder = self._j_builder.useOldPlanner()
+ return self
+
+ def use_blink_planner(self):
+ """
+ Sets the Blink planner as the required module. By default, :func:`use_old_planner` is
+ enabled.
+
+ :return: This object.
+ :rtype: EnvironmentSettings.Builder
+ """
+ self._j_builder = self._j_builder.useBlinkPlanner()
+ return self
+
+ def use_any_planner(self):
+ """
+ Does not set a planner requirement explicitly.
+
+ A planner will be discovered automatically, if there is only one planner available.
+
+ By default, :func:`use_old_planner` is enabled.
+
+ :return: This object.
+ :rtype: EnvironmentSettings.Builder
+ """
+ self._j_builder = self._j_builder.useAnyPlanner()
+ return self
+
+ def in_batch_mode(self):
+ """
+ Sets that the components should work in a batch mode. Streaming mode by default.
+
+ :return: This object.
+ :rtype: EnvironmentSettings.Builder
+ """
+ self._j_builder = self._j_builder.inBatchMode()
+ return self
+
+ def in_streaming_mode(self):
+ """
+ Sets that the components should work in a streaming mode. Enabled by default.
+
+ :return: This object.
+ :rtype: EnvironmentSettings.Builder
+ """
+ self._j_builder = self._j_builder.inStreamingMode()
+ return self
+
+ def with_built_in_catalog_name(self, built_in_catalog_name):
+ """
+ Specifies the name of the initial catalog to be created when instantiating
+ a :class:`~pyflink.table.TableEnvironment`. This catalog will be used to store all
+ non-serializable objects such as tables and functions registered via e.g.
+ :func:`~pyflink.table.TableEnvironment.register_table_sink` or
+ :func:`~pyflink.table.TableEnvironment.register_java_function`. It will also be the
+ initial value for the current catalog which can be altered via
+ :func:`~pyflink.table.TableEnvironment.use_catalog`.
+
+ Default: "default_catalog".
+
+ :param built_in_catalog_name: The specified built-in catalog name.
+ :type built_in_catalog_name: str
+ :return: This object.
+ :rtype: EnvironmentSettings.Builder
+ """
+ self._j_builder = self._j_builder.withBuiltInCatalogName(built_in_catalog_name)
+ return self
+
+ def with_built_in_database_name(self, built_in_database_name):
+ """
+ Specifies the name of the default database in the initial catalog to be
+ created when instantiating a :class:`~pyflink.table.TableEnvironment`. The database
+ will be used to store all non-serializable objects such as tables and functions
+ registered via e.g. :func:`~pyflink.table.TableEnvironment.register_table_sink` or
+ :func:`~pyflink.table.TableEnvironment.register_java_function`. It will also be the
+ initial value for the current database which can be altered via
+ :func:`~pyflink.table.TableEnvironment.use_database`.
+
+ Default: "default_database".
+
+ :param built_in_database_name: The specified built-in database name.
+ :type built_in_database_name: str
+ :return: This object.
+ :rtype: EnvironmentSettings.Builder
+ """
+ self._j_builder = self._j_builder.withBuiltInDatabaseName(built_in_database_name)
+ return self
+
+ def build(self):
+ """
+ Returns an immutable instance of EnvironmentSettings.
+
+ :return: an immutable instance of EnvironmentSettings.
+ :rtype: EnvironmentSettings
+ """
+ return EnvironmentSettings(self._j_builder.build())
+
+ def __init__(self, j_environment_settings):
+ self._j_environment_settings = j_environment_settings
+
+ def get_built_in_catalog_name(self):
+ """
+ Gets the specified name of the initial catalog to be created when instantiating a
+ :class:`~pyflink.table.TableEnvironment`.
+
+ :return: The specified name of the initial catalog to be created.
+ :rtype: str
+ """
+ return self._j_environment_settings.getBuiltInCatalogName()
+
+ def get_built_in_database_name(self):
+ """
+ Gets the specified name of the default database in the initial catalog to be created when
+ instantiating a :class:`~pyflink.table.TableEnvironment`.
+
+ :return: The specified name of the default database in the initial catalog to be created.
+ :rtype: str
+ """
+ return self._j_environment_settings.getBuiltInDatabaseName()
+
+ def is_streaming_mode(self):
+ """
+ Tells if the :class:`~pyflink.table.TableEnvironment` should work in a batch or streaming
+ mode.
+
+ :return: True if the TableEnvironment should work in a streaming mode, false otherwise.
+ :rtype: bool
+ """
+ return self._j_environment_settings.isStreamingMode()
+
+ @staticmethod
+ def new_instance():
+ """
+ Creates a builder for creating an instance of EnvironmentSettings.
+
+ By default, it does not specify a required planner and will use the one that is available
+ on the classpath via discovery.
+
+ :return: A builder of EnvironmentSettings.
+ :rtype: EnvironmentSettings.Builder
+ """
+ return EnvironmentSettings.Builder()
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 2c23deb..b7ad5b5 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -19,6 +19,8 @@ import os
import tempfile
from abc import ABCMeta, abstractmethod
+from py4j.java_gateway import get_java_class
+
from pyflink.serializers import BatchedSerializer, PickleSerializer
from pyflink.table.catalog import Catalog
from pyflink.table.table_config import TableConfig
@@ -730,32 +732,58 @@ class StreamTableEnvironment(TableEnvironment):
self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
@staticmethod
- def create(stream_execution_environment, table_config=None):
+ def create(stream_execution_environment, table_config=None, environment_settings=None):
"""
- Creates a :class:`TableEnvironment` for a :class:`StreamExecutionEnvironment`
+ Creates a :class:`TableEnvironment` for a
+ :class:`~pyflink.datastream.StreamExecutionEnvironment`.
Example:
::
>>> env = StreamExecutionEnvironment.get_execution_environment()
- # create without TableConfig
+ # create without optional parameters.
>>> table_env = StreamTableEnvironment.create(env)
# create with TableConfig
>>> table_config = TableConfig()
>>> table_config.set_null_check(False)
>>> table_env = StreamTableEnvironment.create(env, table_config)
+ # create with EnvrionmentSettings
+ >>> environment_settings = EnvironmentSettings.new_instance().use_blink_planner() \\
+ ... .build()
+ >>> table_env = StreamTableEnvironment.create(
+ ... env, environment_settings=environment_settings)
+
- :param stream_execution_environment: The :class:`StreamExecutionEnvironment` of the
- TableEnvironment.
+ :param stream_execution_environment: The
+ :class:`~pyflink.datastream.StreamExecutionEnvironment`
+ of the TableEnvironment.
+ :type stream_execution_environment: pyflink.datastream.StreamExecutionEnvironment
:param table_config: The configuration of the TableEnvironment, optional.
+ :type table_config: TableConfig
+ :param environment_settings: The environment settings used to instantiate the
+ TableEnvironment. It provides the interfaces about planner
+ selection(flink or blink), optional.
+ :type environment_settings: pyflink.table.EnvironmentSettings
:return: The :class:`StreamTableEnvironment` created from given StreamExecutionEnvironment
and configuration.
+ :rtype: StreamTableEnvironment
"""
+ if table_config is not None and environment_settings is not None:
+ raise ValueError("The param 'table_config' and "
+ "'environment_settings' cannot be used at the same time")
+
gateway = get_gateway()
if table_config is not None:
j_tenv = gateway.jvm.StreamTableEnvironment.create(
stream_execution_environment._j_stream_execution_environment,
table_config._j_table_config)
+ elif environment_settings is not None:
+ if not environment_settings.is_streaming_mode():
+ raise ValueError("The environment settings for StreamTableEnvironment must be "
+ "set to streaming mode.")
+ j_tenv = gateway.jvm.StreamTableEnvironment.create(
+ stream_execution_environment._j_stream_execution_environment,
+ environment_settings._j_environment_settings)
else:
j_tenv = gateway.jvm.StreamTableEnvironment.create(
stream_execution_environment._j_stream_execution_environment)
@@ -770,10 +798,16 @@ class BatchTableEnvironment(TableEnvironment):
def _from_file(self, filename, schema):
gateway = get_gateway()
- jds = gateway.jvm.PythonBridgeUtils.createDataSetFromFile(
- self._j_tenv.execEnv(), filename, True)
- return Table(gateway.jvm.PythonTableUtils.fromDataSet(
- self._j_tenv, jds, _to_java_type(schema)))
+ blink_t_env_class = get_java_class(
+ gateway.jvm.org.apache.flink.table.api.internal.TableEnvironmentImpl)
+ if blink_t_env_class == self._j_tenv.getClass():
+ raise NotImplementedError("The operation 'from_elements' in batch mode is currently "
+ "not supported when using blink planner.")
+ else:
+ jds = gateway.jvm.PythonBridgeUtils.createDataSetFromFile(
+ self._j_tenv.execEnv(), filename, True)
+ return Table(gateway.jvm.PythonTableUtils.fromDataSet(
+ self._j_tenv, jds, _to_java_type(schema)))
def get_config(self):
"""
@@ -810,38 +844,89 @@ class BatchTableEnvironment(TableEnvironment):
... .register_table_source("MyTable")
:param connector_descriptor: Connector descriptor describing the external system.
- :return: A :class:`BatchTableDescriptor` used to build the table source/sink.
+ :type connector_descriptor: ConnectorDescriptor
+ :return: A :class:`BatchTableDescriptor` or a :class:`StreamTableDescriptor`
+ (for blink planner) used to build the table source/sink.
+ :rtype: BatchTableDescriptor or StreamTableDescriptor
"""
- # type: (ConnectorDescriptor) -> BatchTableDescriptor
- return BatchTableDescriptor(
- self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
+ gateway = get_gateway()
+ blink_t_env_class = get_java_class(
+ gateway.jvm.org.apache.flink.table.api.internal.TableEnvironmentImpl)
+ if blink_t_env_class == self._j_tenv.getClass():
+ return StreamTableDescriptor(
+ self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
+ else:
+ return BatchTableDescriptor(
+ self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
@staticmethod
- def create(execution_environment, table_config=None):
+ def create(execution_environment=None, table_config=None, environment_settings=None):
"""
- Creates a :class:`TableEnvironment` for a batch :class:`ExecutionEnvironment`.
+ Creates a :class:`BatchTableEnvironment`.
Example:
::
+ # create with ExecutionEnvironment.
>>> env = ExecutionEnvironment.get_execution_environment()
>>> table_env = BatchTableEnvironment.create(env)
+ # create with ExecutionEnvironment and TableConfig.
>>> table_config = TableConfig()
>>> table_config.set_null_check(False)
>>> table_env = BatchTableEnvironment.create(env, table_config)
-
- :param execution_environment: The batch :class:`ExecutionEnvironment` of the
- TableEnvironment.
+ # create with EnvironmentSettings.
+ >>> environment_settings = EnvironmentSettings.new_instance().in_batch_mode() \\
+ ... .use_blink_planner().build()
+ >>> table_env = BatchTableEnvironment.create(environment_settings=environment_settings)
+
+ :param execution_environment: The batch :class:`pyflink.dataset.ExecutionEnvironment` of
+ the TableEnvironment.
+ :type execution_environment: pyflink.dataset.ExecutionEnvironment
:param table_config: The configuration of the TableEnvironment, optional.
- :return: The :class:`BatchTableEnvironment` created from given ExecutionEnvironment and
+ :type table_config: TableConfig
+ :param environment_settings: The environment settings used to instantiate the
+ TableEnvironment. It provides the interfaces about planner
+ selection(flink or blink), optional.
+ :type environment_settings: pyflink.table.EnvironmentSettings
+ :return: The BatchTableEnvironment created from given ExecutionEnvironment and
configuration.
- """
+ :rtype: BatchTableEnvironment
+ """
+ if execution_environment is None and \
+ table_config is None and \
+ environment_settings is None:
+ raise ValueError("No argument found, the param 'execution_environment' "
+ "or 'environment_settings' is required.")
+ elif execution_environment is None and \
+ table_config is not None and \
+ environment_settings is None:
+ raise ValueError("Only the param 'table_config' is found, "
+ "the param 'execution_environment' is also required.")
+ elif execution_environment is not None and \
+ environment_settings is not None:
+ raise ValueError("The param 'execution_environment' and "
+ "'environment_settings' cannot be used at the same time")
+ elif table_config is not None and \
+ environment_settings is not None:
+ raise ValueError("The param 'table_config' and "
+ "'environment_settings' cannot be used at the same time")
+
gateway = get_gateway()
- if table_config is not None:
- j_tenv = gateway.jvm.BatchTableEnvironment.create(
- execution_environment._j_execution_environment,
- table_config._j_table_config)
- else:
- j_tenv = gateway.jvm.BatchTableEnvironment.create(
- execution_environment._j_execution_environment)
- return BatchTableEnvironment(j_tenv)
+ if execution_environment is not None and environment_settings is None:
+ if table_config is not None:
+ j_tenv = gateway.jvm.BatchTableEnvironment.create(
+ execution_environment._j_execution_environment,
+ table_config._j_table_config)
+ else:
+ j_tenv = gateway.jvm.BatchTableEnvironment.create(
+ execution_environment._j_execution_environment)
+ return BatchTableEnvironment(j_tenv)
+ elif environment_settings is not None and \
+ execution_environment is None and \
+ table_config is None:
+ if environment_settings.is_streaming_mode():
+ raise ValueError("The environment settings for BatchTableEnvironment must be "
+ "set to batch mode.")
+ j_tenv = gateway.jvm.TableEnvironment.create(
+ environment_settings._j_environment_settings)
+ return BatchTableEnvironment(j_tenv)
diff --git a/flink-python/pyflink/table/tests/test_environment_settings.py b/flink-python/pyflink/table/tests/test_environment_settings.py
new file mode 100644
index 0000000..e493107
--- /dev/null
+++ b/flink-python/pyflink/table/tests/test_environment_settings.py
@@ -0,0 +1,133 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.java_gateway import get_gateway
+
+from pyflink.table import EnvironmentSettings
+from pyflink.testing.test_case_utils import PyFlinkTestCase, get_private_field
+
+
+class EnvironmentSettingsTests(PyFlinkTestCase):
+
+ def test_planner_selection(self):
+
+ gateway = get_gateway()
+
+ CLASS_NAME = gateway.jvm.EnvironmentSettings.CLASS_NAME
+
+ builder = EnvironmentSettings.new_instance()
+
+ OLD_PLANNER_FACTORY = get_private_field(builder._j_builder, "OLD_PLANNER_FACTORY")
+ OLD_EXECUTOR_FACTORY = get_private_field(builder._j_builder, "OLD_EXECUTOR_FACTORY")
+ BLINK_PLANNER_FACTORY = get_private_field(builder._j_builder, "BLINK_PLANNER_FACTORY")
+ BLINK_EXECUTOR_FACTORY = get_private_field(builder._j_builder, "BLINK_EXECUTOR_FACTORY")
+
+ # test the default behaviour to make sure it is consistent with the python doc
+ envrionment_settings = builder.build()
+
+ self.assertEqual(
+ envrionment_settings._j_environment_settings.toPlannerProperties()[CLASS_NAME],
+ OLD_PLANNER_FACTORY)
+
+ self.assertEqual(
+ envrionment_settings._j_environment_settings.toExecutorProperties()[CLASS_NAME],
+ OLD_EXECUTOR_FACTORY)
+
+ # test use_old_planner
+ envrionment_settings = builder.use_old_planner().build()
+
+ self.assertEqual(
+ envrionment_settings._j_environment_settings.toPlannerProperties()[CLASS_NAME],
+ OLD_PLANNER_FACTORY)
+
+ self.assertEqual(
+ envrionment_settings._j_environment_settings.toExecutorProperties()[CLASS_NAME],
+ OLD_EXECUTOR_FACTORY)
+
+ # test use_blink_planner
+ envrionment_settings = builder.use_blink_planner().build()
+
+ self.assertEqual(
+ envrionment_settings._j_environment_settings.toPlannerProperties()[CLASS_NAME],
+ BLINK_PLANNER_FACTORY)
+
+ self.assertEqual(
+ envrionment_settings._j_environment_settings.toExecutorProperties()[CLASS_NAME],
+ BLINK_EXECUTOR_FACTORY)
+
+ # test use_any_planner
+ envrionment_settings = builder.use_any_planner().build()
+
+ self.assertTrue(
+ CLASS_NAME not in envrionment_settings._j_environment_settings.toPlannerProperties())
+
+ self.assertTrue(
+ CLASS_NAME not in envrionment_settings._j_environment_settings.toExecutorProperties())
+
+ def test_mode_selection(self):
+
+ builder = EnvironmentSettings.new_instance()
+
+ # test the default behaviour to make sure it is consistent with the python doc
+ envrionment_settings = builder.build()
+
+ self.assertTrue(envrionment_settings.is_streaming_mode())
+
+ # test in_streaming_mode
+ envrionment_settings = builder.in_streaming_mode().build()
+
+ self.assertTrue(envrionment_settings.is_streaming_mode())
+
+ # test in_batch_mode
+ envrionment_settings = builder.in_batch_mode().build()
+
+ self.assertFalse(envrionment_settings.is_streaming_mode())
+
+ def test_with_built_in_catalog_name(self):
+
+ gateway = get_gateway()
+
+ DEFAULT_BUILTIN_CATALOG = gateway.jvm.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG
+
+ builder = EnvironmentSettings.new_instance()
+
+ # test the default behaviour to make sure it is consistent with the python doc
+ envrionment_settings = builder.build()
+
+ self.assertEqual(envrionment_settings.get_built_in_catalog_name(), DEFAULT_BUILTIN_CATALOG)
+
+ envrionment_settings = builder.with_built_in_catalog_name("my_catalog").build()
+
+ self.assertEqual(envrionment_settings.get_built_in_catalog_name(), "my_catalog")
+
+ def test_with_built_in_database_name(self):
+
+ gateway = get_gateway()
+
+ DEFAULT_BUILTIN_DATABASE = gateway.jvm.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE
+
+ builder = EnvironmentSettings.new_instance()
+
+ # test the default behaviour to make sure it is consistent with the python doc
+ envrionment_settings = builder.build()
+
+ self.assertEqual(envrionment_settings.get_built_in_database_name(),
+ DEFAULT_BUILTIN_DATABASE)
+
+ envrionment_settings = builder.with_built_in_database_name("my_database").build()
+
+ self.assertEqual(envrionment_settings.get_built_in_database_name(), "my_database")
diff --git a/flink-python/pyflink/table/tests/test_environment_settings_completeness.py b/flink-python/pyflink/table/tests/test_environment_settings_completeness.py
new file mode 100644
index 0000000..f32e813
--- /dev/null
+++ b/flink-python/pyflink/table/tests/test_environment_settings_completeness.py
@@ -0,0 +1,67 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import unittest
+
+from pyflink.table import EnvironmentSettings
+from pyflink.testing.test_case_utils import PythonAPICompletenessTestCase
+
+
+class EnvironmentSettingsCompletenessTests(PythonAPICompletenessTestCase, unittest.TestCase):
+ """
+ Tests whether the Python :class:`EnvironmentSettings` is consistent with
+ Java `org.apache.flink.table.api.EnvironmentSettings`.
+ """
+
+ @classmethod
+ def python_class(cls):
+ return EnvironmentSettings
+
+ @classmethod
+ def java_class(cls):
+ return "org.apache.flink.table.api.EnvironmentSettings"
+
+ @classmethod
+ def excluded_methods(cls):
+ # internal interfaces, no need to expose to users.
+ return {'toPlannerProperties', 'toExecutorProperties'}
+
+
+class EnvironmentSettingsBuilderCompletenessTests(PythonAPICompletenessTestCase, unittest.TestCase):
+ """
+ Tests whether the Python :class:`EnvironmentSettings.Builder` is consistent with
+ Java `org.apache.flink.table.api.EnvironmentSettings$Builder`.
+ """
+
+ @classmethod
+ def python_class(cls):
+ return EnvironmentSettings.Builder
+
+ @classmethod
+ def java_class(cls):
+ return "org.apache.flink.table.api.EnvironmentSettings$Builder"
+
+
+if __name__ == '__main__':
+ import unittest
+
+ try:
+ import xmlrunner
+ testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
+ except ImportError:
+ testRunner = None
+ unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 3e2dd6f..8bbd491 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -21,7 +21,7 @@ from py4j.compat import unicode
from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.table import DataTypes, CsvTableSink, StreamTableEnvironment
+from pyflink.table import DataTypes, CsvTableSink, StreamTableEnvironment, EnvironmentSettings
from pyflink.table.table_config import TableConfig
from pyflink.table.table_environment import BatchTableEnvironment
from pyflink.table.types import RowType
@@ -197,6 +197,49 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
self.assertEqual(readed_table_config.get_max_generated_code_length(), 32000)
self.assertEqual(readed_table_config.get_local_timezone(), "Asia/Shanghai")
+ def test_create_table_environment_with_blink_planner(self):
+ t_env = StreamTableEnvironment.create(
+ self.env,
+ environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
+
+ planner = t_env._j_tenv.getPlanner()
+
+ self.assertEqual(
+ planner.getClass().getName(),
+ "org.apache.flink.table.planner.delegation.StreamPlanner")
+
+ def test_table_environment_with_blink_planner(self):
+ t_env = StreamTableEnvironment.create(
+ self.env,
+ environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
+
+ source_path = os.path.join(self.tempdir + '/streaming.csv')
+ sink_path = os.path.join(self.tempdir + '/result.csv')
+ field_names = ["a", "b", "c"]
+ field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING()]
+ data = [(1, 'hi', 'hello'), (2, 'hello', 'hello')]
+ csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+
+ t_env.register_table_source("source", csv_source)
+
+ t_env.register_table_sink(
+ "sink",
+ CsvTableSink(field_names, field_types, sink_path))
+ source = t_env.scan("source")
+
+ result = source.alias("a, b, c").select("1 + a, b, c")
+
+ result.insert_into("sink")
+
+ t_env.execute("blink_test")
+
+ results = []
+ with open(sink_path, 'r') as f:
+ results.append(f.readline())
+ results.append(f.readline())
+
+ self.assert_equals(results, ['2,hi,hello\n', '3,hello,hello\n'])
+
class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase):
@@ -273,3 +316,50 @@ class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase):
self.assertFalse(readed_table_config.get_null_check())
self.assertEqual(readed_table_config.get_max_generated_code_length(), 32000)
self.assertEqual(readed_table_config.get_local_timezone(), "Asia/Shanghai")
+
+ def test_create_table_environment_with_blink_planner(self):
+ t_env = BatchTableEnvironment.create(
+ environment_settings=EnvironmentSettings.new_instance().in_batch_mode()
+ .use_blink_planner().build())
+
+ planner = t_env._j_tenv.getPlanner()
+
+ self.assertEqual(
+ planner.getClass().getName(),
+ "org.apache.flink.table.planner.delegation.BatchPlanner")
+
+ def test_table_environment_with_blink_planner(self):
+ t_env = BatchTableEnvironment.create(
+ environment_settings=EnvironmentSettings.new_instance().in_batch_mode()
+ .use_blink_planner().build())
+
+ source_path = os.path.join(self.tempdir + '/streaming.csv')
+ sink_path = os.path.join(self.tempdir + '/results')
+ field_names = ["a", "b", "c"]
+ field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING()]
+ data = [(1, 'hi', 'hello'), (2, 'hello', 'hello')]
+ csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+
+ t_env.register_table_source("source", csv_source)
+
+ t_env.register_table_sink(
+ "sink",
+ CsvTableSink(field_names, field_types, sink_path))
+ source = t_env.scan("source")
+
+ result = source.alias("a, b, c").select("1 + a, b, c")
+
+ result.insert_into("sink")
+
+ t_env.execute("blink_test")
+
+ results = []
+ for root, dirs, files in os.walk(sink_path):
+ for sub_file in files:
+ with open(os.path.join(root, sub_file), 'r') as f:
+ line = f.readline()
+ while line is not None and line != '':
+ results.append(line)
+ line = f.readline()
+
+ self.assert_equals(results, ['2,hi,hello\n', '3,hello,hello\n'])
diff --git a/flink-python/pyflink/testing/test_case_utils.py b/flink-python/pyflink/testing/test_case_utils.py
index cda1e65..c1d484e 100644
--- a/flink-python/pyflink/testing/test_case_utils.py
+++ b/flink-python/pyflink/testing/test_case_utils.py
@@ -25,6 +25,8 @@ import unittest
from abc import abstractmethod
from py4j.java_gateway import JavaObject
+from py4j.protocol import Py4JJavaError
+
from pyflink.table.sources import CsvTableSource
from pyflink.dataset import ExecutionEnvironment
@@ -48,6 +50,23 @@ logging.basicConfig(stream=sys.stdout, level=log_level,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
+def get_private_field(java_obj, field_name):
+ try:
+ field = java_obj.getClass().getDeclaredField(field_name)
+ field.setAccessible(True)
+ return field.get(java_obj)
+ except Py4JJavaError:
+ cls = java_obj.getClass()
+ while cls.getSuperclass() is not None:
+ cls = cls.getSuperclass()
+ try:
+ field = cls.getDeclaredField(field_name)
+ field.setAccessible(True)
+ return field.get(java_obj)
+ except Py4JJavaError:
+ pass
+
+
class PyFlinkTestCase(unittest.TestCase):
"""
Base class for unit tests.