You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2019/07/11 14:51:38 UTC

[flink] branch master updated: [hotfix][python] Use the TableEnvironment.execute() method instead of ExecutionEnvironment.execute()/StreamExecutionEnvironment.execute(). (#9087)

This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1aa43fc  [hotfix][python] Use the TableEnvironment.execute() method instead of ExecutionEnvironment.execute()/StreamExecutionEnvironment.execute(). (#9087)
1aa43fc is described below

commit 1aa43fc62341b4e70676ee5be2b67bd7136856e1
Author: WeiZhong94 <44...@users.noreply.github.com>
AuthorDate: Thu Jul 11 22:51:23 2019 +0800

    [hotfix][python] Use the TableEnvironment.execute() method instead of ExecutionEnvironment.execute()/StreamExecutionEnvironment.execute(). (#9087)
---
 docs/dev/table/common.md                           |  2 +-
 docs/dev/table/common.zh.md                        |  2 +-
 docs/dev/table/tableApi.md                         |  2 +-
 docs/dev/table/tableApi.zh.md                      |  2 +-
 docs/ops/python_shell.md                           |  4 +-
 docs/ops/python_shell.zh.md                        |  4 +-
 docs/tutorials/python_table_api.md                 |  6 +--
 docs/tutorials/python_table_api.zh.md              |  6 +--
 flink-python/pyflink/shell.py                      |  4 +-
 .../pyflink/table/examples/batch/word_count.py     |  2 +-
 flink-python/pyflink/table/table.py                |  2 +-
 flink-python/pyflink/table/table_environment.py    | 49 ++++++----------------
 flink-python/pyflink/table/tests/test_calc.py      |  2 +-
 .../pyflink/table/tests/test_descriptor.py         |  4 +-
 .../pyflink/table/tests/test_shell_example.py      |  4 +-
 .../table/tests/test_table_environment_api.py      |  6 +--
 16 files changed, 38 insertions(+), 63 deletions(-)

diff --git a/docs/dev/table/common.md b/docs/dev/table/common.md
index 5228ff2..c22fbc6 100644
--- a/docs/dev/table/common.md
+++ b/docs/dev/table/common.md
@@ -116,7 +116,7 @@ sql_result  = table_env.sql_query("SELECT ... FROM table2 ...")
 tapi_result.insert_into("outputTable")
 
 # execute
-env.execute()
+table_env.execute("python_job")
 
 {% endhighlight %}
 </div>
diff --git a/docs/dev/table/common.zh.md b/docs/dev/table/common.zh.md
index 5d6f9ae..1f0d6f6 100644
--- a/docs/dev/table/common.zh.md
+++ b/docs/dev/table/common.zh.md
@@ -116,7 +116,7 @@ sql_result  = table_env.sql_query("SELECT ... FROM table2 ...")
 tapi_result.insert_into("outputTable")
 
 # execute
-env.execute()
+table_env.execute("python_job")
 
 {% endhighlight %}
 </div>
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 8dc33e1..c0a2842 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -119,7 +119,7 @@ orders = t_env.scan("Orders")  # schema (a, b, c, rowtime)
 
 orders.group_by("a").select("a, b.count as cnt").insert_into("result")
 
-env.execute()
+t_env.execute("python_job")
 
 {% endhighlight %}
 
diff --git a/docs/dev/table/tableApi.zh.md b/docs/dev/table/tableApi.zh.md
index 409a7b4..729a531 100644
--- a/docs/dev/table/tableApi.zh.md
+++ b/docs/dev/table/tableApi.zh.md
@@ -119,7 +119,7 @@ orders = t_env.scan("Orders")  # schema (a, b, c, rowtime)
 
 orders.group_by("a").select("a, b.count as cnt").insert_into("result")
 
-env.execute()
+t_env.execute("python_job")
 
 {% endhighlight %}
 
diff --git a/docs/ops/python_shell.md b/docs/ops/python_shell.md
index a52cd23..2e5a0cf 100644
--- a/docs/ops/python_shell.md
+++ b/docs/ops/python_shell.md
@@ -70,7 +70,7 @@ The example below is a simple program in the Python shell:
 ...     .register_table_sink("stream_sink")
 >>> t.select("a + 1, b, c")\
 ...     .insert_into("stream_sink")
->>> s_env.execute()
+>>> st_env.execute("stream_job")
 >>> # If the job runs in local mode, you can exec following code in Python shell to see the result:
 >>> with open(sink_path, 'r') as f:
 ...     print(f.read())
@@ -102,7 +102,7 @@ The example below is a simple program in the Python shell:
 ...     .register_table_sink("batch_sink")
 >>> t.select("a + 1, b, c")\
 ...     .insert_into("batch_sink")
->>> b_env.execute()
+>>> bt_env.execute("batch_job")
 >>> # If the job runs in local mode, you can exec following code in Python shell to see the result:
 >>> with open(sink_path, 'r') as f:
 ...     print(f.read())
diff --git a/docs/ops/python_shell.zh.md b/docs/ops/python_shell.zh.md
index 90440d8..2566c9e 100644
--- a/docs/ops/python_shell.zh.md
+++ b/docs/ops/python_shell.zh.md
@@ -69,7 +69,7 @@ bin/pyflink-shell.sh local
 ...     .register_table_sink("stream_sink")
 >>> t.select("a + 1, b, c")\
 ...     .insert_into("stream_sink")
->>> s_env.execute()
+>>> st_env.execute("stream_job")
 >>> # 如果作业运行在local模式, 你可以执行以下代码查看结果:
 >>> with open(sink_path, 'r') as f:
 ...     print(f.read())
@@ -101,7 +101,7 @@ bin/pyflink-shell.sh local
 ...     .register_table_sink("batch_sink")
 >>> t.select("a + 1, b, c")\
 ...     .insert_into("batch_sink")
->>> b_env.execute()
+>>> bt_env.execute("batch_job")
 >>> # 如果作业运行在local模式, 你可以执行以下代码查看结果:
 >>> with open(sink_path, 'r') as f:
 ...     print(f.read())
diff --git a/docs/tutorials/python_table_api.md b/docs/tutorials/python_table_api.md
index 8a6e866..89d3397 100644
--- a/docs/tutorials/python_table_api.md
+++ b/docs/tutorials/python_table_api.md
@@ -94,11 +94,11 @@ t_env.scan('mySource') \
 
 The last thing is to start the actual Flink Python Table API job. All operations, such as
 creating sources, transformations and sinks only build up a graph of internal operations.
-Only when `exec_env.execute()` is called, this graph of operations will be thrown on a cluster or
+Only when `t_env.execute(job_name)` is called, this graph of operations will be thrown on a cluster or
 executed on your local machine.
 
 {% highlight python %}
-exec_env.execute()
+t_env.execute("tutorial_job")
 {% endhighlight %}
 
 The complete code so far is as follows:
@@ -136,7 +136,7 @@ t_env.scan('mySource') \
     .select('word, count(1)') \
     .insert_into('mySink')
 
-exec_env.execute()
+t_env.execute("tutorial_job")
 {% endhighlight %}
 
 ## Executing a Flink Python Table API Program
diff --git a/docs/tutorials/python_table_api.zh.md b/docs/tutorials/python_table_api.zh.md
index 81fc598..bab36e1 100644
--- a/docs/tutorials/python_table_api.zh.md
+++ b/docs/tutorials/python_table_api.zh.md
@@ -86,11 +86,11 @@ t_env.scan('mySource') \
 {% endhighlight %}
 
 最后,需要做的就是启动Flink Python Table API作业。上面所有的操作,比如创建源表
-进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当`exec_env.execute()`被调用的时候,
+进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当`t_env.execute(job_name)`被调用的时候,
 作业才会被真正提交到集群或者本地进行执行。
 
 {% highlight python %}
-exec_env.execute()
+t_env.execute("python_job")
 {% endhighlight %}
 
 该教程的完整代码如下:
@@ -128,7 +128,7 @@ t_env.scan('mySource') \
     .select('word, count(1)') \
     .insert_into('mySink')
 
-exec_env.execute()
+t_env.execute("python_job")
 {% endhighlight %}
 
 ## 执行一个Flink Python Table API程序
diff --git a/flink-python/pyflink/shell.py b/flink-python/pyflink/shell.py
index 97b7065..87eb21d 100644
--- a/flink-python/pyflink/shell.py
+++ b/flink-python/pyflink/shell.py
@@ -109,7 +109,7 @@ NOTE: Use the prebound Table Environment to implement batch or streaming Table p
     *
     * t.select("a + 1, b, c").insert_into("batch_sink")
     *
-    * b_env.execute()
+    * bt_env.execute("batch_job")
 
   Streaming - Use 's_env' and 'st_env' variables
 
@@ -139,7 +139,7 @@ NOTE: Use the prebound Table Environment to implement batch or streaming Table p
     * 
     * t.select("a + 1, b, c").insert_into("stream_sink")
     *
-    * s_env.execute()
+    * st_env.execute("stream_job")
 '''
 utf8_out.write(welcome_msg)
 
diff --git a/flink-python/pyflink/table/examples/batch/word_count.py b/flink-python/pyflink/table/examples/batch/word_count.py
index c3bc1e2..39d9958 100644
--- a/flink-python/pyflink/table/examples/batch/word_count.py
+++ b/flink-python/pyflink/table/examples/batch/word_count.py
@@ -70,7 +70,7 @@ def word_count():
          .select("word, count(1) as count") \
          .insert_into("Results")
 
-    env.execute()
+    t_env.execute("word_count")
 
 
 if __name__ == '__main__':
diff --git a/flink-python/pyflink/table/table.py b/flink-python/pyflink/table/table.py
index fda8ebf..bb336f4 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -52,7 +52,7 @@ class Table(object):
         >>> ...
         >>> t_env.register_table_sink("result", ...)
         >>> t.insert_into("result")
-        >>> env.execute()
+        >>> t_env.execute("table_job")
 
     Operations such as :func:`~pyflink.table.Table.join`, :func:`~pyflink.table.Table.select`,
     :func:`~pyflink.table.Table.where` and :func:`~pyflink.table.Table.group_by`
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 5def53a..b5619e8 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -496,17 +496,20 @@ class TableEnvironment(object):
         Triggers the program execution. The environment will execute all parts of
         the program.
 
-        <p>The program execution will be logged and displayed with the provided name
+        The program execution will be logged and displayed with the provided name.
 
-        <p><b>NOTE:</b>It is highly advised to set all parameters in the :class:`TableConfig`
-        on the very beginning of the program. It is undefined what configurations values will
-        be used for the execution if queries are mixed with config changes. It depends on
-        the characteristic of the particular parameter. For some of them the value from the
-        point in time of query construction (e.g. the currentCatalog) will be used. On the
-        other hand some values might be evaluated according to the state from the time when
-        this method is called (e.g. timeZone).
+        .. note::
+
+            It is highly advised to set all parameters in the :class:`TableConfig`
+            on the very beginning of the program. It is undefined what configurations values will
+            be used for the execution if queries are mixed with config changes. It depends on
+            the characteristic of the particular parameter. For some of them the value from the
+            point in time of query construction (e.g. the current catalog) will be used. On the
+            other hand some values might be evaluated according to the state from the time when
+            this method is called (e.g. timezone).
 
-        :param job_name Desired name of the job
+        :param job_name: Desired name of the job.
+        :type job_name: str
         """
         self._j_tenv.execute(job_name)
 
@@ -687,20 +690,6 @@ class StreamTableEnvironment(TableEnvironment):
         return StreamTableDescriptor(
             self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
 
-    def execute(self, job_name):
-        """
-        Triggers the program execution. The environment will execute all parts of
-        the program.
-
-        The program execution will be logged and displayed with the provided name
-
-        It calls the StreamExecutionEnvironment#execute on the underlying
-        :class:`StreamExecutionEnvironment`. This environment translates queries eagerly.
-
-        :param job_name Desired name of the job
-        """
-        self._j_tenv.execute(job_name)
-
     @staticmethod
     def create(stream_execution_environment, table_config=None):
         """
@@ -788,20 +777,6 @@ class BatchTableEnvironment(TableEnvironment):
         return BatchTableDescriptor(
             self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
 
-    def execute(self, job_name):
-        """
-        Triggers the program execution. The environment will execute all parts of
-        the program.
-
-        The program execution will be logged and displayed with the provided name
-
-        It calls the ExecutionEnvironment#execute on the underlying
-        :class:`ExecutionEnvironment`. This environment translates queries eagerly.
-
-        :param job_name Desired name of the job
-        """
-        self._j_tenv.execute(job_name)
-
     @staticmethod
     def create(execution_environment, table_config=None):
         """
diff --git a/flink-python/pyflink/table/tests/test_calc.py b/flink-python/pyflink/table/tests/test_calc.py
index 7df6e68..b67e083 100644
--- a/flink-python/pyflink/table/tests/test_calc.py
+++ b/flink-python/pyflink/table/tests/test_calc.py
@@ -97,7 +97,7 @@ class StreamTableCalcTests(PyFlinkStreamTableTestCase):
               PythonOnlyPoint(3.0, 4.0))],
             schema)
         t.insert_into("Results")
-        self.env.execute()
+        self.t_env.execute("test")
         actual = source_sink_utils.results()
 
         expected = ['1,1.0,hi,hello,1970-01-02,01:00:00,1970-01-02 00:00:00.0,'
diff --git a/flink-python/pyflink/table/tests/test_descriptor.py b/flink-python/pyflink/table/tests/test_descriptor.py
index 1c0f36e..ddbcb01 100644
--- a/flink-python/pyflink/table/tests/test_descriptor.py
+++ b/flink-python/pyflink/table/tests/test_descriptor.py
@@ -989,7 +989,7 @@ class AbstractTableDescriptorTests(object):
         t_env.scan("source") \
              .select("a + 1, b, c") \
              .insert_into("sink")
-        self.env.execute()
+        self.t_env.execute("test")
 
         with open(sink_path, 'r') as f:
             lines = f.read()
@@ -1031,7 +1031,7 @@ class AbstractTableDescriptorTests(object):
         t_env.scan("source") \
              .select("a + 1, b, c") \
              .insert_into("sink")
-        self.env.execute()
+        self.t_env.execute("test")
 
         with open(sink_path, 'r') as f:
             lines = f.read()
diff --git a/flink-python/pyflink/table/tests/test_shell_example.py b/flink-python/pyflink/table/tests/test_shell_example.py
index 97bce08..48b7817 100644
--- a/flink-python/pyflink/table/tests/test_shell_example.py
+++ b/flink-python/pyflink/table/tests/test_shell_example.py
@@ -52,7 +52,7 @@ class ShellExampleTests(PyFlinkTestCase):
 
         t.select("a + 1, b, c").insert_into("batch_sink")
 
-        b_env.execute()
+        bt_env.execute("batch_job")
 
         # verify code, do not copy these code to shell.py
         with open(sink_path, 'r') as f:
@@ -88,7 +88,7 @@ class ShellExampleTests(PyFlinkTestCase):
 
         t.select("a + 1, b, c").insert_into("stream_sink")
 
-        s_env.execute()
+        st_env.execute("stream_job")
 
         # verify code, do not copy these code to shell.py
         with open(sink_path, 'r') as f:
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 2a92e9b..54188ff 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -53,7 +53,7 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
             source_sink_utils.TestAppendSink(field_names, field_types))
 
         t_env.from_elements([(1, "Hi", "Hello")], ["a", "b", "c"]).insert_into("Sinks")
-        self.env.execute()
+        self.t_env.execute("test")
         actual = source_sink_utils.results()
 
         expected = ['1,Hi,Hello']
@@ -114,7 +114,7 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
 
         result = t_env.sql_query("select a + 1, b, c from %s" % source)
         result.insert_into("sinks")
-        self.env.execute()
+        self.t_env.execute("test")
         actual = source_sink_utils.results()
 
         expected = ['2,Hi,Hello', '3,Hello,Hello']
@@ -130,7 +130,7 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
             source_sink_utils.TestAppendSink(field_names, field_types))
 
         t_env.sql_update("insert into sinks select * from %s" % source)
-        self.env.execute("test_sql_job")
+        self.t_env.execute("test_sql_job")
 
         actual = source_sink_utils.results()
         expected = ['1,Hi,Hello', '2,Hello,Hello']