You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2019/07/16 00:38:39 UTC

[flink] branch master updated: [FLINK-13263] [python] Supports explain DAG plan in flink-python

This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new dce0d0a  [FLINK-13263] [python] Supports explain DAG plan in flink-python
dce0d0a is described below

commit dce0d0a37151400d87f0e8a1e002747da9a0a34e
Author: godfreyhe <go...@163.com>
AuthorDate: Mon Jul 15 18:09:31 2019 +0800

    [FLINK-13263] [python] Supports explain DAG plan in flink-python
    
    This closes #9114
---
 flink-python/pyflink/table/table_environment.py    | 14 +++--
 .../table/tests/test_table_environment_api.py      | 69 +++++++++++++++++++++-
 flink-python/pyflink/util/exceptions.py            |  8 +++
 3 files changed, 85 insertions(+), 6 deletions(-)

diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index b5619e8..071c1f5 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -224,15 +224,21 @@ class TableEnvironment(object):
         j_table_name_array = self._j_tenv.listTables()
         return [item for item in j_table_name_array]
 
-    def explain(self, table):
+    def explain(self, table=None, extended=False):
         """
         Returns the AST of the specified Table API and SQL queries and the execution plan to compute
-        the result of the given :class:`Table`.
+        the result of the given :class:`Table` or multi-sinks plan.
 
-        :param table: The table to be explained.
+        :param table: The table to be explained. If table is None, explain for multi-sinks plan,
+                      else for given table.
+        :param extended: If the plan should contain additional properties.
+                         e.g. estimated cost, traits
         :return: The table for which the AST and execution plan will be returned.
         """
-        return self._j_tenv.explain(table._j_table)
+        if table is None:
+            return self._j_tenv.explain(extended)
+        else:
+            return self._j_tenv.explain(table._j_table, extended)
 
     def sql_query(self, query):
         """
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 54188ff..ef787f8 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -22,11 +22,13 @@ from py4j.compat import unicode
 
 from pyflink.dataset import ExecutionEnvironment
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.table.table_environment import BatchTableEnvironment, StreamTableEnvironment
+from pyflink.table import DataTypes, CsvTableSink, StreamTableEnvironment
 from pyflink.table.table_config import TableConfig
-from pyflink.table.types import DataTypes, RowType
+from pyflink.table.table_environment import BatchTableEnvironment
+from pyflink.table.types import RowType
 from pyflink.testing import source_sink_utils
 from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, PyFlinkBatchTableTestCase
+from pyflink.util.exceptions import TableException
 
 
 class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
@@ -103,6 +105,38 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
 
         assert isinstance(actual, str) or isinstance(actual, unicode)
 
+    def test_explain_with_extended(self):
+        schema = RowType() \
+            .add('a', DataTypes.INT()) \
+            .add('b', DataTypes.STRING()) \
+            .add('c', DataTypes.STRING())
+        t_env = self.t_env
+        t = t_env.from_elements([], schema)
+        result = t.select("1 + a, b, c")
+
+        actual = t_env.explain(result, True)
+
+        assert isinstance(actual, str) or isinstance(actual, unicode)
+
+    def test_explain_with_multi_sinks(self):
+        t_env = self.t_env
+        source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
+        t_env.register_table_sink(
+            "sink1",
+            source_sink_utils.TestAppendSink(field_names, field_types))
+        t_env.register_table_sink(
+            "sink2",
+            source_sink_utils.TestAppendSink(field_names, field_types))
+
+        t_env.sql_update("insert into sink1 select * from %s where a > 100" % source)
+        t_env.sql_update("insert into sink2 select * from %s where a < 100" % source)
+
+        actual = t_env.explain(extended=True)
+
+        assert isinstance(actual, str) or isinstance(actual, unicode)
+
     def test_sql_query(self):
         t_env = self.t_env
         source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
@@ -195,6 +229,37 @@ class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase):
 
         self.assertIsInstance(actual, (str, unicode))
 
+    def test_explain_with_extended(self):
+        schema = RowType() \
+            .add('a', DataTypes.INT()) \
+            .add('b', DataTypes.STRING()) \
+            .add('c', DataTypes.STRING())
+        t_env = self.t_env
+        t = t_env.from_elements([], schema)
+        result = t.select("1 + a, b, c")
+
+        actual = t_env.explain(result, True)
+
+        assert isinstance(actual, str) or isinstance(actual, unicode)
+
+    def test_explain_with_multi_sinks(self):
+        t_env = self.t_env
+        source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
+        t_env.register_table_sink(
+            "sink1",
+            CsvTableSink(field_names, field_types, "path1"))
+        t_env.register_table_sink(
+            "sink2",
+            CsvTableSink(field_names, field_types, "path2"))
+
+        t_env.sql_update("insert into sink1 select * from %s where a > 100" % source)
+        t_env.sql_update("insert into sink2 select * from %s where a < 100" % source)
+
+        with self.assertRaises(TableException):
+            t_env.explain(extended=True)
+
     def test_table_config(self):
 
         table_config = TableConfig()
diff --git a/flink-python/pyflink/util/exceptions.py b/flink-python/pyflink/util/exceptions.py
index 57d17ad..2a28936 100644
--- a/flink-python/pyflink/util/exceptions.py
+++ b/flink-python/pyflink/util/exceptions.py
@@ -28,6 +28,12 @@ class JavaException(Exception):
         return repr(self.msg)
 
 
+class TableException(JavaException):
+    """
+    General Exception for all errors during table handling.
+    """
+
+
 class CatalogException(JavaException):
     """
     A catalog-related exception.
@@ -106,6 +112,8 @@ class TableNotPartitionedException(JavaException):
 
 # Mapping from JavaException to PythonException
 exception_mapping = {
+    "org.apache.flink.table.api.TableException":
+        TableException,
     "org.apache.flink.table.catalog.exceptions.CatalogException":
         CatalogException,
     "org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException":