You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2019/07/16 00:48:56 UTC
[flink] branch release-1.9 updated: [FLINK-13263] [python] Supports
explain DAG plan in flink-python
This is an automated email from the ASF dual-hosted git repository.
jincheng pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 4497072 [FLINK-13263] [python] Supports explain DAG plan in flink-python
4497072 is described below
commit 4497072f7baae228c506385fb9f1c2c0450ef55c
Author: godfreyhe <go...@163.com>
AuthorDate: Mon Jul 15 18:09:31 2019 +0800
[FLINK-13263] [python] Supports explain DAG plan in flink-python
This closes #9114
---
flink-python/pyflink/table/table_environment.py | 14 +++--
.../table/tests/test_table_environment_api.py | 69 +++++++++++++++++++++-
flink-python/pyflink/util/exceptions.py | 8 +++
3 files changed, 85 insertions(+), 6 deletions(-)
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index b5619e8..071c1f5 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -224,15 +224,21 @@ class TableEnvironment(object):
j_table_name_array = self._j_tenv.listTables()
return [item for item in j_table_name_array]
- def explain(self, table):
+ def explain(self, table=None, extended=False):
"""
Returns the AST of the specified Table API and SQL queries and the execution plan to compute
- the result of the given :class:`Table`.
+ the result of the given :class:`Table` or multi-sinks plan.
- :param table: The table to be explained.
+ :param table: The table to be explained. If table is None, explain for multi-sinks plan,
+ else for given table.
+ :param extended: If the plan should contain additional properties.
+ e.g. estimated cost, traits
:return: The table for which the AST and execution plan will be returned.
"""
- return self._j_tenv.explain(table._j_table)
+ if table is None:
+ return self._j_tenv.explain(extended)
+ else:
+ return self._j_tenv.explain(table._j_table, extended)
def sql_query(self, query):
"""
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 54188ff..ef787f8 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -22,11 +22,13 @@ from py4j.compat import unicode
from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.table.table_environment import BatchTableEnvironment, StreamTableEnvironment
+from pyflink.table import DataTypes, CsvTableSink, StreamTableEnvironment
from pyflink.table.table_config import TableConfig
-from pyflink.table.types import DataTypes, RowType
+from pyflink.table.table_environment import BatchTableEnvironment
+from pyflink.table.types import RowType
from pyflink.testing import source_sink_utils
from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, PyFlinkBatchTableTestCase
+from pyflink.util.exceptions import TableException
class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
@@ -103,6 +105,38 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
assert isinstance(actual, str) or isinstance(actual, unicode)
+ def test_explain_with_extended(self):
+ schema = RowType() \
+ .add('a', DataTypes.INT()) \
+ .add('b', DataTypes.STRING()) \
+ .add('c', DataTypes.STRING())
+ t_env = self.t_env
+ t = t_env.from_elements([], schema)
+ result = t.select("1 + a, b, c")
+
+ actual = t_env.explain(result, True)
+
+ assert isinstance(actual, str) or isinstance(actual, unicode)
+
+ def test_explain_with_multi_sinks(self):
+ t_env = self.t_env
+ source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
+ field_names = ["a", "b", "c"]
+ field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
+ t_env.register_table_sink(
+ "sink1",
+ source_sink_utils.TestAppendSink(field_names, field_types))
+ t_env.register_table_sink(
+ "sink2",
+ source_sink_utils.TestAppendSink(field_names, field_types))
+
+ t_env.sql_update("insert into sink1 select * from %s where a > 100" % source)
+ t_env.sql_update("insert into sink2 select * from %s where a < 100" % source)
+
+ actual = t_env.explain(extended=True)
+
+ assert isinstance(actual, str) or isinstance(actual, unicode)
+
def test_sql_query(self):
t_env = self.t_env
source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
@@ -195,6 +229,37 @@ class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase):
self.assertIsInstance(actual, (str, unicode))
+ def test_explain_with_extended(self):
+ schema = RowType() \
+ .add('a', DataTypes.INT()) \
+ .add('b', DataTypes.STRING()) \
+ .add('c', DataTypes.STRING())
+ t_env = self.t_env
+ t = t_env.from_elements([], schema)
+ result = t.select("1 + a, b, c")
+
+ actual = t_env.explain(result, True)
+
+ assert isinstance(actual, str) or isinstance(actual, unicode)
+
+ def test_explain_with_multi_sinks(self):
+ t_env = self.t_env
+ source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
+ field_names = ["a", "b", "c"]
+ field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
+ t_env.register_table_sink(
+ "sink1",
+ CsvTableSink(field_names, field_types, "path1"))
+ t_env.register_table_sink(
+ "sink2",
+ CsvTableSink(field_names, field_types, "path2"))
+
+ t_env.sql_update("insert into sink1 select * from %s where a > 100" % source)
+ t_env.sql_update("insert into sink2 select * from %s where a < 100" % source)
+
+ with self.assertRaises(TableException):
+ t_env.explain(extended=True)
+
def test_table_config(self):
table_config = TableConfig()
diff --git a/flink-python/pyflink/util/exceptions.py b/flink-python/pyflink/util/exceptions.py
index 57d17ad..2a28936 100644
--- a/flink-python/pyflink/util/exceptions.py
+++ b/flink-python/pyflink/util/exceptions.py
@@ -28,6 +28,12 @@ class JavaException(Exception):
return repr(self.msg)
+class TableException(JavaException):
+ """
+ General Exception for all errors during table handling.
+ """
+
+
class CatalogException(JavaException):
"""
A catalog-related exception.
@@ -106,6 +112,8 @@ class TableNotPartitionedException(JavaException):
# Mapping from JavaException to PythonException
exception_mapping = {
+ "org.apache.flink.table.api.TableException":
+ TableException,
"org.apache.flink.table.catalog.exceptions.CatalogException":
CatalogException,
"org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException":