You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/10 05:30:28 UTC

[flink] branch master updated: [hotfix][python] Fix the documentation issue

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c3eb3a3  [hotfix][python] Fix the documentation issue
c3eb3a3 is described below

commit c3eb3a32d2a70ed4eaafa6a77b62d659519522ab
Author: Dian Fu <fu...@alibaba-inc.com>
AuthorDate: Wed Jul 10 11:54:29 2019 +0800

    [hotfix][python] Fix the documentation issue
---
 flink-python/pyflink/table/table_environment.py    | 130 +++++++--------------
 .../table/tests/test_environment_completeness.py   |   3 +-
 .../table/tests/test_table_environment_api.py      |  23 +---
 3 files changed, 46 insertions(+), 110 deletions(-)

diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 4b248f9..f3f6497 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -21,7 +21,6 @@ from abc import ABCMeta, abstractmethod
 
 from pyflink.serializers import BatchedSerializer, PickleSerializer
 from pyflink.table.catalog import Catalog
-from pyflink.table.query_config import QueryConfig
 from pyflink.table.table_config import TableConfig
 from pyflink.table.descriptors import (StreamTableDescriptor, ConnectorDescriptor,
                                        BatchTableDescriptor)
@@ -244,50 +243,56 @@ class TableEnvironment(object):
 
         A DDL statement can execute to create/drop a table/view:
         For example, the below DDL statement would create a CSV table named `tbl1`
-        into the current catalog:
+        into the current catalog::
 
-        create table tbl1(
-            a int,
-            b bigint,
-            c varchar
-        ) with (
-            connector = 'csv',
-            csv.path = 'xxx'
-        )
-
-        The returns table format for different kind of statement:
-        DDL: returns null.
-        DML: a sql insert returns null; a sql query(select) returns a table
-        to describe the query data set, it can be further queried through the Table API,
-        or directly write to sink with `~Table.insert_into`.
-
-        SQL queries can directly execute as follows:
-        ::
-        >>> sinkDDL =
-            "create table sinkTable(
+            create table tbl1(
                 a int,
-                b varchar
+                b bigint,
+                c varchar
             ) with (
                 connector = 'csv',
                 csv.path = 'xxx'
-            )"
+            )
 
-        >>> sourceDDL =
-            "create table sourceTable(
-                a int,
-                b varchar
-            ) with (
-                connector = 'kafka',
-                kafka.topic = 'xxx',
-                kafka.endpoint = 'x.x.x'
-            )"
+        The returns table format for different kind of statement:
 
-        query = "INSERT INTO sinkTable SELECT FROM sourceTable"
+        DDL: returns None.
 
-        tEnv.sql(sourceDDL)
-        tEnv.sql(sinkDDL)
-        tEnv.sql(query)
-        tEnv.execute("MyJob")
+        DML: a sql insert returns None; a sql query(select) returns a table
+        to describe the query data set, it can be further queried through the Table API,
+        or directly write to sink with :func:`Table.insert_into`.
+
+        SQL queries can directly execute as follows:
+        ::
+
+            >>> source_ddl = \\
+            ... '''
+            ... create table sourceTable(
+            ...     a int,
+            ...     b varchar
+            ... ) with (
+            ...     connector = 'kafka',
+            ...     kafka.topic = 'xxx',
+            ...     kafka.endpoint = 'x.x.x'
+            ... )
+            ... '''
+
+            >>> sink_ddl = \\
+            ... '''
+            ... create table sinkTable(
+            ...     a int,
+            ...     b varchar
+            ... ) with (
+            ...     connector = 'csv',
+            ...     csv.path = 'xxx'
+            ... )
+            ... '''
+
+            >>> query = "INSERT INTO sinkTable SELECT FROM sourceTable"
+            >>> table_env.sql(source_ddl)
+            >>> table_env.sql(sink_ddl)
+            >>> table_env.sql(query)
+            >>> table_env.execute("MyJob")
 
         This code snippet creates a job to read data from Kafka source into a CSV sink.
 
@@ -298,57 +303,6 @@ class TableEnvironment(object):
             return None
         return Table(j_table)
 
-    def sql_query(self, query):
-        """
-        Evaluates a SQL query on registered tables and retrieves the result as a :class:`Table`.
-
-        All tables referenced by the query must be registered in the TableEnvironment.
-
-        A :class:`Table` is automatically registered when its :func:`~Table.__str__` method is
-        called, for example when it is embedded into a String.
-
-        Hence, SQL queries can directly reference a :class:`Table` as follows:
-        ::
-
-            >>> table = ...
-            # the table is not registered to the table environment
-            >>> table_env.sql_query("SELECT * FROM %s" % table)
-
-        :param query: The sql query string.
-        :return: The result :class:`Table`.
-        """
-        j_table = self._j_tenv.sqlQuery(query)
-        return Table(j_table)
-
-    def sql_update(self, stmt, query_config=None):
-        """
-        Evaluates a SQL statement such as INSERT, UPDATE or DELETE or a DDL statement
-
-        .. note::
-
-            Currently only SQL INSERT statements are supported.
-
-        All tables referenced by the query must be registered in the TableEnvironment.
-        A :class:`Table` is automatically registered when its :func:`~Table.__str__` method is
-        called, for example when it is embedded into a String.
-        Hence, SQL queries can directly reference a :class:`Table` as follows:
-        ::
-
-            # register the table sink into which the result is inserted.
-            >>> table_env.register_table_sink("sink_table", table_sink)
-            >>> source_table = ...
-            # source_table is not registered to the table environment
-            >>> table_env.sql_update("INSERT INTO sink_table SELECT * FROM %s" % source_table)
-
-        :param stmt: The SQL statement to evaluate.
-        :param query_config: The :class:`QueryConfig` to use.
-        """
-        # type: (str, QueryConfig) -> None
-        if query_config is not None:
-            self._j_tenv.sqlUpdate(stmt, query_config._j_query_config)
-        else:
-            self._j_tenv.sqlUpdate(stmt)
-
     def get_current_catalog(self):
         """
         Gets the current default catalog name of the current session.
diff --git a/flink-python/pyflink/table/tests/test_environment_completeness.py b/flink-python/pyflink/table/tests/test_environment_completeness.py
index 1f82445..fd95fc3 100644
--- a/flink-python/pyflink/table/tests/test_environment_completeness.py
+++ b/flink-python/pyflink/table/tests/test_environment_completeness.py
@@ -41,10 +41,11 @@ class EnvironmentAPICompletenessTests(PythonAPICompletenessTestCase, unittest.Te
         # registerExternalCatalog, getRegisteredExternalCatalog, registerCatalog, getCatalog and
         # listTables should be supported when catalog supported in python.
         # getCompletionHints has been deprecated. It will be removed in the next release.
+        # sqlQuery and sqlUpdate has been deprecated. It will be removed in the next release.
         # TODO add TableEnvironment#create method with EnvironmentSettings as a parameter
         return {'registerExternalCatalog', 'getRegisteredExternalCatalog', 'registerCatalog',
                 'getCatalog', 'registerFunction', 'listUserDefinedFunctions', 'listTables',
-                'getCompletionHints', 'create'}
+                'getCompletionHints', 'create', 'sqlQuery', 'sqlUpdate'}
 
 
 if __name__ == '__main__':
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 4eba1bb..3e43d5f 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -113,7 +113,7 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
             "sinks",
             source_sink_utils.TestAppendSink(field_names, field_types))
 
-        result = t_env.sql_query("select a + 1, b, c from %s" % source)
+        result = t_env.sql("select a + 1, b, c from %s" % source)
         result.insert_into("sinks")
         self.env.execute()
         actual = source_sink_utils.results()
@@ -130,26 +130,7 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
             "sinks",
             source_sink_utils.TestAppendSink(field_names, field_types))
 
-        t_env.sql_update("insert into sinks select * from %s" % source)
-        self.env.execute("test_sql_job")
-
-        actual = source_sink_utils.results()
-        expected = ['1,Hi,Hello', '2,Hello,Hello']
-        self.assert_equals(actual, expected)
-
-    def test_sql_update_with_query_config(self):
-        t_env = self.t_env
-        source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
-        t_env.register_table_sink(
-            "sinks",
-            source_sink_utils.TestAppendSink(field_names, field_types))
-        query_config = StreamQueryConfig()
-        query_config.with_idle_state_retention_time(
-            datetime.timedelta(days=1), datetime.timedelta(days=2))
-
-        t_env.sql_update("insert into sinks select * from %s" % source, query_config)
+        t_env.sql("insert into sinks select * from %s" % source)
         self.env.execute("test_sql_job")
 
         actual = source_sink_utils.results()