You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2020/06/09 12:13:24 UTC
[flink] branch release-1.11 updated: [FLINK-18126][python] Correct
the exception handling of the Python CompletableFuture
This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new e042c81 [FLINK-18126][python] Correct the exception handling of the Python CompletableFuture
e042c81 is described below
commit e042c8136765c567dbf5c55c4c8ca2557d9a48dd
Author: Dian Fu <di...@apache.org>
AuthorDate: Thu Jun 4 22:44:04 2020 +0800
[FLINK-18126][python] Correct the exception handling of the Python CompletableFuture
This closes #12488.
---
flink-python/pyflink/common/completable_future.py | 48 +++++++++++++++-------
flink-python/pyflink/common/job_client.py | 13 ++++--
.../pyflink/common/job_execution_result.py | 2 +-
.../dataset/tests/test_execution_environment.py | 2 +-
.../tests/test_stream_execution_environment.py | 2 +-
flink-python/pyflink/util/exceptions.py | 17 ++++++++
6 files changed, 63 insertions(+), 21 deletions(-)
diff --git a/flink-python/pyflink/common/completable_future.py b/flink-python/pyflink/common/completable_future.py
index 18b9f56..6edee94 100644
--- a/flink-python/pyflink/common/completable_future.py
+++ b/flink-python/pyflink/common/completable_future.py
@@ -15,12 +15,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from asyncio import Future
+
+from py4j.protocol import Py4JJavaError
+
+from pyflink.util.exceptions import convert_py4j_exception
__all__ = ['CompletableFuture']
-class CompletableFuture(Future):
+class CompletableFuture(object):
"""
A Future that may be explicitly completed (setting its value and status), supporting dependent
functions and actions that trigger upon its completion.
@@ -32,34 +35,51 @@ class CompletableFuture(Future):
"""
def __init__(self, j_completable_future, py_class=None):
- super().__init__()
self._j_completable_future = j_completable_future
self._py_class = py_class
- def cancel(self):
+ def cancel(self) -> bool:
+ """
+ Completes this CompletableFuture if not already completed.
+
+ :return: true if this task is now cancelled
+ """
return self._j_completable_future.cancel(True)
- def cancelled(self):
+ def cancelled(self) -> bool:
+ """
+ Returns true if this CompletableFuture was cancelled before it completed normally.
+ """
return self._j_completable_future.isCancelled()
- def done(self):
+ def done(self) -> bool:
+ """
+ Returns true if completed in any fashion: normally, exceptionally, or via cancellation.
+ """
return self._j_completable_future.isDone()
def result(self):
+ """
+ Waits if necessary for this future to complete, and then returns its result.
+
+ :return: the result value
+ """
if self._py_class is None:
return self._j_completable_future.get()
else:
return self._py_class(self._j_completable_future.get())
def exception(self):
- return self._exception
-
- def set_result(self, result):
- return self._j_completable_future.complete(result)
-
- def set_exception(self, exception):
- self._exception = exception
- return self._j_completable_future.completeExceptionally(exception)
+ """
+ Returns the exception that was set on this future or None if no exception was set.
+ """
+ if self._j_completable_future.isCompletedExceptionally():
+ try:
+ self._j_completable_future.getNow(None)
+ except Py4JJavaError as e:
+ return convert_py4j_exception(e)
+ else:
+ return None
def __str__(self):
return self._j_completable_future.toString()
diff --git a/flink-python/pyflink/common/job_client.py b/flink-python/pyflink/common/job_client.py
index 9bdfa62..1de2d1a 100644
--- a/flink-python/pyflink/common/job_client.py
+++ b/flink-python/pyflink/common/job_client.py
@@ -19,6 +19,7 @@ from pyflink.common.completable_future import CompletableFuture
from pyflink.common.job_execution_result import JobExecutionResult
from pyflink.common.job_id import JobID
from pyflink.common.job_status import JobStatus
+from pyflink.java_gateway import get_gateway
__all__ = ['JobClient']
@@ -66,7 +67,7 @@ class JobClient(object):
"""
return CompletableFuture(self._j_job_client.cancel())
- def stop_with_savepoint(self, advance_to_end_of_event_time, savepoint_directory):
+ def stop_with_savepoint(self, advance_to_end_of_event_time, savepoint_directory=None):
"""
Stops the associated job on Flink cluster.
@@ -88,7 +89,7 @@ class JobClient(object):
self._j_job_client.stopWithSavepoint(advance_to_end_of_event_time, savepoint_directory),
str)
- def trigger_savepoint(self, savepoint_directory):
+ def trigger_savepoint(self, savepoint_directory=None):
"""
Triggers a savepoint for the associated job. The savepoint will be written to the given
savepoint directory.
@@ -102,7 +103,7 @@ class JobClient(object):
"""
return CompletableFuture(self._j_job_client.triggerSavepoint(savepoint_directory), str)
- def get_accumulators(self, class_loader):
+ def get_accumulators(self, class_loader=None):
"""
Requests the accumulators of the associated job. Accumulators can be requested while it
is running or after it has finished. The class loader is used to deserialize the incoming
@@ -114,9 +115,11 @@ class JobClient(object):
.. versionadded:: 1.11.0
"""
+ if class_loader is None:
+ class_loader = get_gateway().jvm.Thread.currentThread().getContextClassLoader()
return CompletableFuture(self._j_job_client.getAccumulators(class_loader), dict)
- def get_job_execution_result(self, user_class_loader):
+ def get_job_execution_result(self, user_class_loader=None):
"""
Returns the JobExecutionResult result of the job execution of the submitted job.
@@ -126,5 +129,7 @@ class JobClient(object):
.. versionadded:: 1.11.0
"""
+ if user_class_loader is None:
+ user_class_loader = get_gateway().jvm.Thread.currentThread().getContextClassLoader()
return CompletableFuture(self._j_job_client.getJobExecutionResult(user_class_loader),
JobExecutionResult)
diff --git a/flink-python/pyflink/common/job_execution_result.py b/flink-python/pyflink/common/job_execution_result.py
index 2462237..bef45a8 100644
--- a/flink-python/pyflink/common/job_execution_result.py
+++ b/flink-python/pyflink/common/job_execution_result.py
@@ -104,7 +104,7 @@ class JobExecutionResult(object):
accumulators[key] = j_result_map[key]
return accumulators
- def to_string(self):
+ def __str__(self):
"""
Convert JobExecutionResult to a string, if possible.
diff --git a/flink-python/pyflink/dataset/tests/test_execution_environment.py b/flink-python/pyflink/dataset/tests/test_execution_environment.py
index 32a6bd2..77c1550 100644
--- a/flink-python/pyflink/dataset/tests/test_execution_environment.py
+++ b/flink-python/pyflink/dataset/tests/test_execution_environment.py
@@ -135,4 +135,4 @@ class ExecutionEnvironmentTests(PyFlinkTestCase):
self.assertIsNotNone(execution_result.get_net_runtime())
self.assertEqual(len(execution_result.get_all_accumulator_results()), 0)
self.assertIsNone(execution_result.get_accumulator_result('accumulator'))
- self.assertIsNotNone(execution_result.to_string())
+ self.assertIsNotNone(str(execution_result))
diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
index 01f73bf..d27df00 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
@@ -212,4 +212,4 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase):
self.assertIsNotNone(execution_result.get_net_runtime())
self.assertEqual(len(execution_result.get_all_accumulator_results()), 0)
self.assertIsNone(execution_result.get_accumulator_result('accumulator'))
- self.assertIsNotNone(execution_result.to_string())
+ self.assertIsNotNone(str(execution_result))
diff --git a/flink-python/pyflink/util/exceptions.py b/flink-python/pyflink/util/exceptions.py
index c3ee65e..d1eedf8 100644
--- a/flink-python/pyflink/util/exceptions.py
+++ b/flink-python/pyflink/util/exceptions.py
@@ -184,3 +184,20 @@ def install_py4j_hooks():
"command line argument '--jarfile' or the config option 'pipeline.jars'" % self._fqn)
setattr(py4j.java_gateway.JavaPackage, '__call__', wrapped_call)
+
+
+def convert_py4j_exception(e: Py4JJavaError) -> JavaException:
+ """
+ Convert Py4J exception to JavaException.
+ """
+ def extract_java_stack_trace(java_stack_trace):
+ return '\n\t at '.join(map(lambda x: x.toString(), java_stack_trace))
+
+ s = e.java_exception.toString()
+ cause = e.java_exception.getCause()
+ stack_trace = extract_java_stack_trace(e.java_exception.getStackTrace())
+ while cause is not None:
+ stack_trace += '\nCaused by: %s: %s' % (cause.getClass().getName(), cause.getMessage())
+ stack_trace += "\n\t at " + extract_java_stack_trace(cause.getStackTrace())
+ cause = cause.getCause()
+ return JavaException(s.split(': ', 1)[1], stack_trace)