You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2019/07/11 14:51:38 UTC
[flink] branch master updated: [hotfix][python] Use the
TableEnvironment.execute() method instead of
ExecutionEnvironment.execute()/StreamExecutionEnvironment.execute().
(#9087)
This is an automated email from the ASF dual-hosted git repository.
jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1aa43fc [hotfix][python] Use the TableEnvironment.execute() method instead of ExecutionEnvironment.execute()/StreamExecutionEnvironment.execute(). (#9087)
1aa43fc is described below
commit 1aa43fc62341b4e70676ee5be2b67bd7136856e1
Author: WeiZhong94 <44...@users.noreply.github.com>
AuthorDate: Thu Jul 11 22:51:23 2019 +0800
[hotfix][python] Use the TableEnvironment.execute() method instead of ExecutionEnvironment.execute()/StreamExecutionEnvironment.execute(). (#9087)
---
docs/dev/table/common.md | 2 +-
docs/dev/table/common.zh.md | 2 +-
docs/dev/table/tableApi.md | 2 +-
docs/dev/table/tableApi.zh.md | 2 +-
docs/ops/python_shell.md | 4 +-
docs/ops/python_shell.zh.md | 4 +-
docs/tutorials/python_table_api.md | 6 +--
docs/tutorials/python_table_api.zh.md | 6 +--
flink-python/pyflink/shell.py | 4 +-
.../pyflink/table/examples/batch/word_count.py | 2 +-
flink-python/pyflink/table/table.py | 2 +-
flink-python/pyflink/table/table_environment.py | 49 ++++++----------------
flink-python/pyflink/table/tests/test_calc.py | 2 +-
.../pyflink/table/tests/test_descriptor.py | 4 +-
.../pyflink/table/tests/test_shell_example.py | 4 +-
.../table/tests/test_table_environment_api.py | 6 +--
16 files changed, 38 insertions(+), 63 deletions(-)
diff --git a/docs/dev/table/common.md b/docs/dev/table/common.md
index 5228ff2..c22fbc6 100644
--- a/docs/dev/table/common.md
+++ b/docs/dev/table/common.md
@@ -116,7 +116,7 @@ sql_result = table_env.sql_query("SELECT ... FROM table2 ...")
tapi_result.insert_into("outputTable")
# execute
-env.execute()
+table_env.execute("python_job")
{% endhighlight %}
</div>
diff --git a/docs/dev/table/common.zh.md b/docs/dev/table/common.zh.md
index 5d6f9ae..1f0d6f6 100644
--- a/docs/dev/table/common.zh.md
+++ b/docs/dev/table/common.zh.md
@@ -116,7 +116,7 @@ sql_result = table_env.sql_query("SELECT ... FROM table2 ...")
tapi_result.insert_into("outputTable")
# execute
-env.execute()
+table_env.execute("python_job")
{% endhighlight %}
</div>
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 8dc33e1..c0a2842 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -119,7 +119,7 @@ orders = t_env.scan("Orders") # schema (a, b, c, rowtime)
orders.group_by("a").select("a, b.count as cnt").insert_into("result")
-env.execute()
+t_env.execute("python_job")
{% endhighlight %}
diff --git a/docs/dev/table/tableApi.zh.md b/docs/dev/table/tableApi.zh.md
index 409a7b4..729a531 100644
--- a/docs/dev/table/tableApi.zh.md
+++ b/docs/dev/table/tableApi.zh.md
@@ -119,7 +119,7 @@ orders = t_env.scan("Orders") # schema (a, b, c, rowtime)
orders.group_by("a").select("a, b.count as cnt").insert_into("result")
-env.execute()
+t_env.execute("python_job")
{% endhighlight %}
diff --git a/docs/ops/python_shell.md b/docs/ops/python_shell.md
index a52cd23..2e5a0cf 100644
--- a/docs/ops/python_shell.md
+++ b/docs/ops/python_shell.md
@@ -70,7 +70,7 @@ The example below is a simple program in the Python shell:
... .register_table_sink("stream_sink")
>>> t.select("a + 1, b, c")\
... .insert_into("stream_sink")
->>> s_env.execute()
+>>> st_env.execute("stream_job")
>>> # If the job runs in local mode, you can exec following code in Python shell to see the result:
>>> with open(sink_path, 'r') as f:
... print(f.read())
@@ -102,7 +102,7 @@ The example below is a simple program in the Python shell:
... .register_table_sink("batch_sink")
>>> t.select("a + 1, b, c")\
... .insert_into("batch_sink")
->>> b_env.execute()
+>>> bt_env.execute("batch_job")
>>> # If the job runs in local mode, you can exec following code in Python shell to see the result:
>>> with open(sink_path, 'r') as f:
... print(f.read())
diff --git a/docs/ops/python_shell.zh.md b/docs/ops/python_shell.zh.md
index 90440d8..2566c9e 100644
--- a/docs/ops/python_shell.zh.md
+++ b/docs/ops/python_shell.zh.md
@@ -69,7 +69,7 @@ bin/pyflink-shell.sh local
... .register_table_sink("stream_sink")
>>> t.select("a + 1, b, c")\
... .insert_into("stream_sink")
->>> s_env.execute()
+>>> st_env.execute("stream_job")
>>> # 如果作业运行在local模式, 你可以执行以下代码查看结果:
>>> with open(sink_path, 'r') as f:
... print(f.read())
@@ -101,7 +101,7 @@ bin/pyflink-shell.sh local
... .register_table_sink("batch_sink")
>>> t.select("a + 1, b, c")\
... .insert_into("batch_sink")
->>> b_env.execute()
+>>> bt_env.execute("batch_job")
>>> # 如果作业运行在local模式, 你可以执行以下代码查看结果:
>>> with open(sink_path, 'r') as f:
... print(f.read())
diff --git a/docs/tutorials/python_table_api.md b/docs/tutorials/python_table_api.md
index 8a6e866..89d3397 100644
--- a/docs/tutorials/python_table_api.md
+++ b/docs/tutorials/python_table_api.md
@@ -94,11 +94,11 @@ t_env.scan('mySource') \
The last thing is to start the actual Flink Python Table API job. All operations, such as
creating sources, transformations and sinks only build up a graph of internal operations.
-Only when `exec_env.execute()` is called, this graph of operations will be thrown on a cluster or
+Only when `t_env.execute(job_name)` is called, this graph of operations will be thrown on a cluster or
executed on your local machine.
{% highlight python %}
-exec_env.execute()
+t_env.execute("tutorial_job")
{% endhighlight %}
The complete code so far is as follows:
@@ -136,7 +136,7 @@ t_env.scan('mySource') \
.select('word, count(1)') \
.insert_into('mySink')
-exec_env.execute()
+t_env.execute("tutorial_job")
{% endhighlight %}
## Executing a Flink Python Table API Program
diff --git a/docs/tutorials/python_table_api.zh.md b/docs/tutorials/python_table_api.zh.md
index 81fc598..bab36e1 100644
--- a/docs/tutorials/python_table_api.zh.md
+++ b/docs/tutorials/python_table_api.zh.md
@@ -86,11 +86,11 @@ t_env.scan('mySource') \
{% endhighlight %}
最后,需要做的就是启动Flink Python Table API作业。上面所有的操作,比如创建源表
-进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当`exec_env.execute()`被调用的时候,
+进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当`t_env.execute(job_name)`被调用的时候,
作业才会被真正提交到集群或者本地进行执行。
{% highlight python %}
-exec_env.execute()
+t_env.execute("python_job")
{% endhighlight %}
该教程的完整代码如下:
@@ -128,7 +128,7 @@ t_env.scan('mySource') \
.select('word, count(1)') \
.insert_into('mySink')
-exec_env.execute()
+t_env.execute("python_job")
{% endhighlight %}
## 执行一个Flink Python Table API程序
diff --git a/flink-python/pyflink/shell.py b/flink-python/pyflink/shell.py
index 97b7065..87eb21d 100644
--- a/flink-python/pyflink/shell.py
+++ b/flink-python/pyflink/shell.py
@@ -109,7 +109,7 @@ NOTE: Use the prebound Table Environment to implement batch or streaming Table p
*
* t.select("a + 1, b, c").insert_into("batch_sink")
*
- * b_env.execute()
+ * bt_env.execute("batch_job")
Streaming - Use 's_env' and 'st_env' variables
@@ -139,7 +139,7 @@ NOTE: Use the prebound Table Environment to implement batch or streaming Table p
*
* t.select("a + 1, b, c").insert_into("stream_sink")
*
- * s_env.execute()
+ * st_env.execute("stream_job")
'''
utf8_out.write(welcome_msg)
diff --git a/flink-python/pyflink/table/examples/batch/word_count.py b/flink-python/pyflink/table/examples/batch/word_count.py
index c3bc1e2..39d9958 100644
--- a/flink-python/pyflink/table/examples/batch/word_count.py
+++ b/flink-python/pyflink/table/examples/batch/word_count.py
@@ -70,7 +70,7 @@ def word_count():
.select("word, count(1) as count") \
.insert_into("Results")
- env.execute()
+ t_env.execute("word_count")
if __name__ == '__main__':
diff --git a/flink-python/pyflink/table/table.py b/flink-python/pyflink/table/table.py
index fda8ebf..bb336f4 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -52,7 +52,7 @@ class Table(object):
>>> ...
>>> t_env.register_table_sink("result", ...)
>>> t.insert_into("result")
- >>> env.execute()
+ >>> t_env.execute("table_job")
Operations such as :func:`~pyflink.table.Table.join`, :func:`~pyflink.table.Table.select`,
:func:`~pyflink.table.Table.where` and :func:`~pyflink.table.Table.group_by`
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 5def53a..b5619e8 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -496,17 +496,20 @@ class TableEnvironment(object):
Triggers the program execution. The environment will execute all parts of
the program.
- <p>The program execution will be logged and displayed with the provided name
+ The program execution will be logged and displayed with the provided name.
- <p><b>NOTE:</b>It is highly advised to set all parameters in the :class:`TableConfig`
- on the very beginning of the program. It is undefined what configurations values will
- be used for the execution if queries are mixed with config changes. It depends on
- the characteristic of the particular parameter. For some of them the value from the
- point in time of query construction (e.g. the currentCatalog) will be used. On the
- other hand some values might be evaluated according to the state from the time when
- this method is called (e.g. timeZone).
+ .. note::
+
+ It is highly advised to set all parameters in the :class:`TableConfig`
+ on the very beginning of the program. It is undefined what configurations values will
+ be used for the execution if queries are mixed with config changes. It depends on
+ the characteristic of the particular parameter. For some of them the value from the
+ point in time of query construction (e.g. the current catalog) will be used. On the
+ other hand some values might be evaluated according to the state from the time when
+ this method is called (e.g. timezone).
- :param job_name Desired name of the job
+ :param job_name: Desired name of the job.
+ :type job_name: str
"""
self._j_tenv.execute(job_name)
@@ -687,20 +690,6 @@ class StreamTableEnvironment(TableEnvironment):
return StreamTableDescriptor(
self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
- def execute(self, job_name):
- """
- Triggers the program execution. The environment will execute all parts of
- the program.
-
- The program execution will be logged and displayed with the provided name
-
- It calls the StreamExecutionEnvironment#execute on the underlying
- :class:`StreamExecutionEnvironment`. This environment translates queries eagerly.
-
- :param job_name Desired name of the job
- """
- self._j_tenv.execute(job_name)
-
@staticmethod
def create(stream_execution_environment, table_config=None):
"""
@@ -788,20 +777,6 @@ class BatchTableEnvironment(TableEnvironment):
return BatchTableDescriptor(
self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
- def execute(self, job_name):
- """
- Triggers the program execution. The environment will execute all parts of
- the program.
-
- The program execution will be logged and displayed with the provided name
-
- It calls the ExecutionEnvironment#execute on the underlying
- :class:`ExecutionEnvironment`. This environment translates queries eagerly.
-
- :param job_name Desired name of the job
- """
- self._j_tenv.execute(job_name)
-
@staticmethod
def create(execution_environment, table_config=None):
"""
diff --git a/flink-python/pyflink/table/tests/test_calc.py b/flink-python/pyflink/table/tests/test_calc.py
index 7df6e68..b67e083 100644
--- a/flink-python/pyflink/table/tests/test_calc.py
+++ b/flink-python/pyflink/table/tests/test_calc.py
@@ -97,7 +97,7 @@ class StreamTableCalcTests(PyFlinkStreamTableTestCase):
PythonOnlyPoint(3.0, 4.0))],
schema)
t.insert_into("Results")
- self.env.execute()
+ self.t_env.execute("test")
actual = source_sink_utils.results()
expected = ['1,1.0,hi,hello,1970-01-02,01:00:00,1970-01-02 00:00:00.0,'
diff --git a/flink-python/pyflink/table/tests/test_descriptor.py b/flink-python/pyflink/table/tests/test_descriptor.py
index 1c0f36e..ddbcb01 100644
--- a/flink-python/pyflink/table/tests/test_descriptor.py
+++ b/flink-python/pyflink/table/tests/test_descriptor.py
@@ -989,7 +989,7 @@ class AbstractTableDescriptorTests(object):
t_env.scan("source") \
.select("a + 1, b, c") \
.insert_into("sink")
- self.env.execute()
+ self.t_env.execute("test")
with open(sink_path, 'r') as f:
lines = f.read()
@@ -1031,7 +1031,7 @@ class AbstractTableDescriptorTests(object):
t_env.scan("source") \
.select("a + 1, b, c") \
.insert_into("sink")
- self.env.execute()
+ self.t_env.execute("test")
with open(sink_path, 'r') as f:
lines = f.read()
diff --git a/flink-python/pyflink/table/tests/test_shell_example.py b/flink-python/pyflink/table/tests/test_shell_example.py
index 97bce08..48b7817 100644
--- a/flink-python/pyflink/table/tests/test_shell_example.py
+++ b/flink-python/pyflink/table/tests/test_shell_example.py
@@ -52,7 +52,7 @@ class ShellExampleTests(PyFlinkTestCase):
t.select("a + 1, b, c").insert_into("batch_sink")
- b_env.execute()
+ bt_env.execute("batch_job")
# verify code, do not copy these code to shell.py
with open(sink_path, 'r') as f:
@@ -88,7 +88,7 @@ class ShellExampleTests(PyFlinkTestCase):
t.select("a + 1, b, c").insert_into("stream_sink")
- s_env.execute()
+ st_env.execute("stream_job")
# verify code, do not copy these code to shell.py
with open(sink_path, 'r') as f:
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 2a92e9b..54188ff 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -53,7 +53,7 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
source_sink_utils.TestAppendSink(field_names, field_types))
t_env.from_elements([(1, "Hi", "Hello")], ["a", "b", "c"]).insert_into("Sinks")
- self.env.execute()
+ self.t_env.execute("test")
actual = source_sink_utils.results()
expected = ['1,Hi,Hello']
@@ -114,7 +114,7 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
result = t_env.sql_query("select a + 1, b, c from %s" % source)
result.insert_into("sinks")
- self.env.execute()
+ self.t_env.execute("test")
actual = source_sink_utils.results()
expected = ['2,Hi,Hello', '3,Hello,Hello']
@@ -130,7 +130,7 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
source_sink_utils.TestAppendSink(field_names, field_types))
t_env.sql_update("insert into sinks select * from %s" % source)
- self.env.execute("test_sql_job")
+ self.t_env.execute("test_sql_job")
actual = source_sink_utils.results()
expected = ['1,Hi,Hello', '2,Hello,Hello']