You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2020/06/09 12:13:24 UTC

[flink] branch release-1.11 updated: [FLINK-18126][python] Correct the exception handling of the Python CompletableFuture

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new e042c81  [FLINK-18126][python] Correct the exception handling of the Python CompletableFuture
e042c81 is described below

commit e042c8136765c567dbf5c55c4c8ca2557d9a48dd
Author: Dian Fu <di...@apache.org>
AuthorDate: Thu Jun 4 22:44:04 2020 +0800

    [FLINK-18126][python] Correct the exception handling of the Python CompletableFuture
    
    This closes #12488.
---
 flink-python/pyflink/common/completable_future.py  | 48 +++++++++++++++-------
 flink-python/pyflink/common/job_client.py          | 13 ++++--
 .../pyflink/common/job_execution_result.py         |  2 +-
 .../dataset/tests/test_execution_environment.py    |  2 +-
 .../tests/test_stream_execution_environment.py     |  2 +-
 flink-python/pyflink/util/exceptions.py            | 17 ++++++++
 6 files changed, 63 insertions(+), 21 deletions(-)

diff --git a/flink-python/pyflink/common/completable_future.py b/flink-python/pyflink/common/completable_future.py
index 18b9f56..6edee94 100644
--- a/flink-python/pyflink/common/completable_future.py
+++ b/flink-python/pyflink/common/completable_future.py
@@ -15,12 +15,15 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-from asyncio import Future
+
+from py4j.protocol import Py4JJavaError
+
+from pyflink.util.exceptions import convert_py4j_exception
 
 __all__ = ['CompletableFuture']
 
 
-class CompletableFuture(Future):
+class CompletableFuture(object):
     """
     A Future that may be explicitly completed (setting its value and status), supporting dependent
     functions and actions that trigger upon its completion.
@@ -32,34 +35,51 @@ class CompletableFuture(Future):
     """
 
     def __init__(self, j_completable_future, py_class=None):
-        super().__init__()
         self._j_completable_future = j_completable_future
         self._py_class = py_class
 
-    def cancel(self):
+    def cancel(self) -> bool:
+        """
+        Completes this CompletableFuture if not already completed.
+
+        :return: true if this task is now cancelled
+        """
         return self._j_completable_future.cancel(True)
 
-    def cancelled(self):
+    def cancelled(self) -> bool:
+        """
+        Returns true if this CompletableFuture was cancelled before it completed normally.
+        """
         return self._j_completable_future.isCancelled()
 
-    def done(self):
+    def done(self) -> bool:
+        """
+        Returns true if completed in any fashion: normally, exceptionally, or via cancellation.
+        """
         return self._j_completable_future.isDone()
 
     def result(self):
+        """
+        Waits if necessary for this future to complete, and then returns its result.
+
+        :return: the result value
+        """
         if self._py_class is None:
             return self._j_completable_future.get()
         else:
             return self._py_class(self._j_completable_future.get())
 
     def exception(self):
-        return self._exception
-
-    def set_result(self, result):
-        return self._j_completable_future.complete(result)
-
-    def set_exception(self, exception):
-        self._exception = exception
-        return self._j_completable_future.completeExceptionally(exception)
+        """
+        Returns the exception that was set on this future or None if no exception was set.
+        """
+        if self._j_completable_future.isCompletedExceptionally():
+            try:
+                self._j_completable_future.getNow(None)
+            except Py4JJavaError as e:
+                return convert_py4j_exception(e)
+        else:
+            return None
 
     def __str__(self):
         return self._j_completable_future.toString()
diff --git a/flink-python/pyflink/common/job_client.py b/flink-python/pyflink/common/job_client.py
index 9bdfa62..1de2d1a 100644
--- a/flink-python/pyflink/common/job_client.py
+++ b/flink-python/pyflink/common/job_client.py
@@ -19,6 +19,7 @@ from pyflink.common.completable_future import CompletableFuture
 from pyflink.common.job_execution_result import JobExecutionResult
 from pyflink.common.job_id import JobID
 from pyflink.common.job_status import JobStatus
+from pyflink.java_gateway import get_gateway
 
 __all__ = ['JobClient']
 
@@ -66,7 +67,7 @@ class JobClient(object):
         """
         return CompletableFuture(self._j_job_client.cancel())
 
-    def stop_with_savepoint(self, advance_to_end_of_event_time, savepoint_directory):
+    def stop_with_savepoint(self, advance_to_end_of_event_time, savepoint_directory=None):
         """
         Stops the associated job on Flink cluster.
 
@@ -88,7 +89,7 @@ class JobClient(object):
             self._j_job_client.stopWithSavepoint(advance_to_end_of_event_time, savepoint_directory),
             str)
 
-    def trigger_savepoint(self, savepoint_directory):
+    def trigger_savepoint(self, savepoint_directory=None):
         """
         Triggers a savepoint for the associated job. The savepoint will be written to the given
         savepoint directory.
@@ -102,7 +103,7 @@ class JobClient(object):
         """
         return CompletableFuture(self._j_job_client.triggerSavepoint(savepoint_directory), str)
 
-    def get_accumulators(self, class_loader):
+    def get_accumulators(self, class_loader=None):
         """
         Requests the accumulators of the associated job. Accumulators can be requested while it
         is running or after it has finished. The class loader is used to deserialize the incoming
@@ -114,9 +115,11 @@ class JobClient(object):
 
         .. versionadded:: 1.11.0
         """
+        if class_loader is None:
+            class_loader = get_gateway().jvm.Thread.currentThread().getContextClassLoader()
         return CompletableFuture(self._j_job_client.getAccumulators(class_loader), dict)
 
-    def get_job_execution_result(self, user_class_loader):
+    def get_job_execution_result(self, user_class_loader=None):
         """
         Returns the JobExecutionResult result of the job execution of the submitted job.
 
@@ -126,5 +129,7 @@ class JobClient(object):
 
         .. versionadded:: 1.11.0
         """
+        if user_class_loader is None:
+            user_class_loader = get_gateway().jvm.Thread.currentThread().getContextClassLoader()
         return CompletableFuture(self._j_job_client.getJobExecutionResult(user_class_loader),
                                  JobExecutionResult)
diff --git a/flink-python/pyflink/common/job_execution_result.py b/flink-python/pyflink/common/job_execution_result.py
index 2462237..bef45a8 100644
--- a/flink-python/pyflink/common/job_execution_result.py
+++ b/flink-python/pyflink/common/job_execution_result.py
@@ -104,7 +104,7 @@ class JobExecutionResult(object):
             accumulators[key] = j_result_map[key]
         return accumulators
 
-    def to_string(self):
+    def __str__(self):
         """
         Convert JobExecutionResult to a string, if possible.
 
diff --git a/flink-python/pyflink/dataset/tests/test_execution_environment.py b/flink-python/pyflink/dataset/tests/test_execution_environment.py
index 32a6bd2..77c1550 100644
--- a/flink-python/pyflink/dataset/tests/test_execution_environment.py
+++ b/flink-python/pyflink/dataset/tests/test_execution_environment.py
@@ -135,4 +135,4 @@ class ExecutionEnvironmentTests(PyFlinkTestCase):
         self.assertIsNotNone(execution_result.get_net_runtime())
         self.assertEqual(len(execution_result.get_all_accumulator_results()), 0)
         self.assertIsNone(execution_result.get_accumulator_result('accumulator'))
-        self.assertIsNotNone(execution_result.to_string())
+        self.assertIsNotNone(str(execution_result))
diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
index 01f73bf..d27df00 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
@@ -212,4 +212,4 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase):
         self.assertIsNotNone(execution_result.get_net_runtime())
         self.assertEqual(len(execution_result.get_all_accumulator_results()), 0)
         self.assertIsNone(execution_result.get_accumulator_result('accumulator'))
-        self.assertIsNotNone(execution_result.to_string())
+        self.assertIsNotNone(str(execution_result))
diff --git a/flink-python/pyflink/util/exceptions.py b/flink-python/pyflink/util/exceptions.py
index c3ee65e..d1eedf8 100644
--- a/flink-python/pyflink/util/exceptions.py
+++ b/flink-python/pyflink/util/exceptions.py
@@ -184,3 +184,20 @@ def install_py4j_hooks():
             "command line argument '--jarfile' or the config option 'pipeline.jars'" % self._fqn)
 
     setattr(py4j.java_gateway.JavaPackage, '__call__', wrapped_call)
+
+
+def convert_py4j_exception(e: Py4JJavaError) -> JavaException:
+    """
+    Convert Py4J exception to JavaException.
+    """
+    def extract_java_stack_trace(java_stack_trace):
+        return '\n\t at '.join(map(lambda x: x.toString(), java_stack_trace))
+
+    s = e.java_exception.toString()
+    cause = e.java_exception.getCause()
+    stack_trace = extract_java_stack_trace(e.java_exception.getStackTrace())
+    while cause is not None:
+        stack_trace += '\nCaused by: %s: %s' % (cause.getClass().getName(), cause.getMessage())
+        stack_trace += "\n\t at " + extract_java_stack_trace(cause.getStackTrace())
+        cause = cause.getCause()
+    return JavaException(s.split(': ', 1)[1], stack_trace)