You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/10 05:30:28 UTC
[flink] branch master updated: [hotfix][python] Fix the
documentation issue
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c3eb3a3 [hotfix][python] Fix the documentation issue
c3eb3a3 is described below
commit c3eb3a32d2a70ed4eaafa6a77b62d659519522ab
Author: Dian Fu <fu...@alibaba-inc.com>
AuthorDate: Wed Jul 10 11:54:29 2019 +0800
[hotfix][python] Fix the documentation issue
---
flink-python/pyflink/table/table_environment.py | 130 +++++++--------------
.../table/tests/test_environment_completeness.py | 3 +-
.../table/tests/test_table_environment_api.py | 23 +---
3 files changed, 46 insertions(+), 110 deletions(-)
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 4b248f9..f3f6497 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -21,7 +21,6 @@ from abc import ABCMeta, abstractmethod
from pyflink.serializers import BatchedSerializer, PickleSerializer
from pyflink.table.catalog import Catalog
-from pyflink.table.query_config import QueryConfig
from pyflink.table.table_config import TableConfig
from pyflink.table.descriptors import (StreamTableDescriptor, ConnectorDescriptor,
BatchTableDescriptor)
@@ -244,50 +243,56 @@ class TableEnvironment(object):
A DDL statement can execute to create/drop a table/view:
For example, the below DDL statement would create a CSV table named `tbl1`
- into the current catalog:
+ into the current catalog::
- create table tbl1(
- a int,
- b bigint,
- c varchar
- ) with (
- connector = 'csv',
- csv.path = 'xxx'
- )
-
- The returns table format for different kind of statement:
- DDL: returns null.
- DML: a sql insert returns null; a sql query(select) returns a table
- to describe the query data set, it can be further queried through the Table API,
- or directly write to sink with `~Table.insert_into`.
-
- SQL queries can directly execute as follows:
- ::
- >>> sinkDDL =
- "create table sinkTable(
+ create table tbl1(
a int,
- b varchar
+ b bigint,
+ c varchar
) with (
connector = 'csv',
csv.path = 'xxx'
- )"
+ )
- >>> sourceDDL =
- "create table sourceTable(
- a int,
- b varchar
- ) with (
- connector = 'kafka',
- kafka.topic = 'xxx',
- kafka.endpoint = 'x.x.x'
- )"
+ The returns table format for different kind of statement:
- query = "INSERT INTO sinkTable SELECT FROM sourceTable"
+ DDL: returns None.
- tEnv.sql(sourceDDL)
- tEnv.sql(sinkDDL)
- tEnv.sql(query)
- tEnv.execute("MyJob")
+ DML: a sql insert returns None; a sql query(select) returns a table
+ to describe the query data set, it can be further queried through the Table API,
+ or directly write to sink with :func:`Table.insert_into`.
+
+ SQL queries can directly execute as follows:
+ ::
+
+ >>> source_ddl = \\
+ ... '''
+ ... create table sourceTable(
+ ... a int,
+ ... b varchar
+ ... ) with (
+ ... connector = 'kafka',
+ ... kafka.topic = 'xxx',
+ ... kafka.endpoint = 'x.x.x'
+ ... )
+ ... '''
+
+ >>> sink_ddl = \\
+ ... '''
+ ... create table sinkTable(
+ ... a int,
+ ... b varchar
+ ... ) with (
+ ... connector = 'csv',
+ ... csv.path = 'xxx'
+ ... )
+ ... '''
+
+ >>> query = "INSERT INTO sinkTable SELECT FROM sourceTable"
+ >>> table_env.sql(source_ddl)
+ >>> table_env.sql(sink_ddl)
+ >>> table_env.sql(query)
+ >>> table_env.execute("MyJob")
This code snippet creates a job to read data from Kafka source into a CSV sink.
@@ -298,57 +303,6 @@ class TableEnvironment(object):
return None
return Table(j_table)
- def sql_query(self, query):
- """
- Evaluates a SQL query on registered tables and retrieves the result as a :class:`Table`.
-
- All tables referenced by the query must be registered in the TableEnvironment.
-
- A :class:`Table` is automatically registered when its :func:`~Table.__str__` method is
- called, for example when it is embedded into a String.
-
- Hence, SQL queries can directly reference a :class:`Table` as follows:
- ::
-
- >>> table = ...
- # the table is not registered to the table environment
- >>> table_env.sql_query("SELECT * FROM %s" % table)
-
- :param query: The sql query string.
- :return: The result :class:`Table`.
- """
- j_table = self._j_tenv.sqlQuery(query)
- return Table(j_table)
-
- def sql_update(self, stmt, query_config=None):
- """
- Evaluates a SQL statement such as INSERT, UPDATE or DELETE or a DDL statement
-
- .. note::
-
- Currently only SQL INSERT statements are supported.
-
- All tables referenced by the query must be registered in the TableEnvironment.
- A :class:`Table` is automatically registered when its :func:`~Table.__str__` method is
- called, for example when it is embedded into a String.
- Hence, SQL queries can directly reference a :class:`Table` as follows:
- ::
-
- # register the table sink into which the result is inserted.
- >>> table_env.register_table_sink("sink_table", table_sink)
- >>> source_table = ...
- # source_table is not registered to the table environment
- >>> table_env.sql_update("INSERT INTO sink_table SELECT * FROM %s" % source_table)
-
- :param stmt: The SQL statement to evaluate.
- :param query_config: The :class:`QueryConfig` to use.
- """
- # type: (str, QueryConfig) -> None
- if query_config is not None:
- self._j_tenv.sqlUpdate(stmt, query_config._j_query_config)
- else:
- self._j_tenv.sqlUpdate(stmt)
-
def get_current_catalog(self):
"""
Gets the current default catalog name of the current session.
diff --git a/flink-python/pyflink/table/tests/test_environment_completeness.py b/flink-python/pyflink/table/tests/test_environment_completeness.py
index 1f82445..fd95fc3 100644
--- a/flink-python/pyflink/table/tests/test_environment_completeness.py
+++ b/flink-python/pyflink/table/tests/test_environment_completeness.py
@@ -41,10 +41,11 @@ class EnvironmentAPICompletenessTests(PythonAPICompletenessTestCase, unittest.Te
# registerExternalCatalog, getRegisteredExternalCatalog, registerCatalog, getCatalog and
# listTables should be supported when catalog supported in python.
# getCompletionHints has been deprecated. It will be removed in the next release.
+ # sqlQuery and sqlUpdate has been deprecated. It will be removed in the next release.
# TODO add TableEnvironment#create method with EnvironmentSettings as a parameter
return {'registerExternalCatalog', 'getRegisteredExternalCatalog', 'registerCatalog',
'getCatalog', 'registerFunction', 'listUserDefinedFunctions', 'listTables',
- 'getCompletionHints', 'create'}
+ 'getCompletionHints', 'create', 'sqlQuery', 'sqlUpdate'}
if __name__ == '__main__':
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 4eba1bb..3e43d5f 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -113,7 +113,7 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
"sinks",
source_sink_utils.TestAppendSink(field_names, field_types))
- result = t_env.sql_query("select a + 1, b, c from %s" % source)
+ result = t_env.sql("select a + 1, b, c from %s" % source)
result.insert_into("sinks")
self.env.execute()
actual = source_sink_utils.results()
@@ -130,26 +130,7 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
"sinks",
source_sink_utils.TestAppendSink(field_names, field_types))
- t_env.sql_update("insert into sinks select * from %s" % source)
- self.env.execute("test_sql_job")
-
- actual = source_sink_utils.results()
- expected = ['1,Hi,Hello', '2,Hello,Hello']
- self.assert_equals(actual, expected)
-
- def test_sql_update_with_query_config(self):
- t_env = self.t_env
- source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
- field_names = ["a", "b", "c"]
- field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
- t_env.register_table_sink(
- "sinks",
- source_sink_utils.TestAppendSink(field_names, field_types))
- query_config = StreamQueryConfig()
- query_config.with_idle_state_retention_time(
- datetime.timedelta(days=1), datetime.timedelta(days=2))
-
- t_env.sql_update("insert into sinks select * from %s" % source, query_config)
+ t_env.sql("insert into sinks select * from %s" % source)
self.env.execute("test_sql_job")
actual = source_sink_utils.results()