You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by he...@apache.org on 2020/05/29 05:55:18 UTC

[flink] branch release-1.11 updated: [FLINK-17303][python] Return TableResult for Python TableEnvironment

This is an automated email from the ASF dual-hosted git repository.

hequn pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 386cea7  [FLINK-17303][python] Return TableResult for Python TableEnvironment
386cea7 is described below

commit 386cea7beccf10fd5893ab2d960ec886a8acfdd0
Author: SteNicholas <pr...@163.com>
AuthorDate: Wed May 6 10:09:55 2020 +0800

    [FLINK-17303][python] Return TableResult for Python TableEnvironment
    
    This closes #12246.
---
 flink-python/pyflink/common/__init__.py            |   8 +
 flink-python/pyflink/common/completable_future.py  |  63 ++++++++
 flink-python/pyflink/common/execution_mode.py      |   2 +-
 flink-python/pyflink/common/job_client.py          | 114 ++++++++++++++
 .../pyflink/common/job_execution_result.py         |   3 +-
 .../pyflink/common/{__init__.py => job_id.py}      |  35 ++---
 flink-python/pyflink/common/job_status.py          | 167 +++++++++++++++++++++
 flink-python/pyflink/table/__init__.py             |  48 +++---
 .../{common/__init__.py => table/result_kind.py}   |  53 ++++---
 flink-python/pyflink/table/table_environment.py    |   4 +-
 flink-python/pyflink/table/table_result.py         |  69 +++++++++
 flink-python/pyflink/table/tests/test_sql.py       |  47 +++++-
 12 files changed, 542 insertions(+), 71 deletions(-)

diff --git a/flink-python/pyflink/common/__init__.py b/flink-python/pyflink/common/__init__.py
index fcea204..6bf412d 100644
--- a/flink-python/pyflink/common/__init__.py
+++ b/flink-python/pyflink/common/__init__.py
@@ -22,19 +22,27 @@ Important classes used by both Flink Streaming and Batch API:
     - :class:`ExecutionConfig`:
       A config to define the behavior of the program execution.
 """
+from pyflink.common.completable_future import CompletableFuture
 from pyflink.common.configuration import Configuration
 from pyflink.common.execution_config import ExecutionConfig
 from pyflink.common.execution_mode import ExecutionMode
 from pyflink.common.input_dependency_constraint import InputDependencyConstraint
+from pyflink.common.job_client import JobClient
 from pyflink.common.job_execution_result import JobExecutionResult
+from pyflink.common.job_id import JobID
+from pyflink.common.job_status import JobStatus
 from pyflink.common.restart_strategy import RestartStrategies, RestartStrategyConfiguration
 
 __all__ = [
+    'CompletableFuture',
     'Configuration',
     'ExecutionConfig',
     'ExecutionMode',
     'InputDependencyConstraint',
+    'JobClient',
     'JobExecutionResult',
+    'JobID',
+    'JobStatus',
     'RestartStrategies',
     'RestartStrategyConfiguration',
 ]
diff --git a/flink-python/pyflink/common/completable_future.py b/flink-python/pyflink/common/completable_future.py
new file mode 100644
index 0000000..266ea26
--- /dev/null
+++ b/flink-python/pyflink/common/completable_future.py
@@ -0,0 +1,63 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from asyncio import Future
+
+__all__ = ['CompletableFuture']
+
+
+class CompletableFuture(Future):
+    """
+    A Future that may be explicitly completed (setting its value and status), supporting dependent
+    functions and actions that trigger upon its completion.
+
+    When two or more threads attempt to set_result, set_exception, or cancel a CompletableFuture,
+    only one of them succeeds.
+    """
+
+    def __init__(self, j_completable_future, py_class=None):
+        super().__init__()
+        self._j_completable_future = j_completable_future
+        self._py_class = py_class
+
+    def cancel(self):
+        return self._j_completable_future.cancel(True)
+
+    def cancelled(self):
+        return self._j_completable_future.isCancelled()
+
+    def done(self):
+        return self._j_completable_future.isDone()
+
+    def result(self):
+        if self._py_class is None:
+            return self._j_completable_future.get()
+        else:
+            return self._py_class(self._j_completable_future.get())
+
+    def exception(self):
+        return self._exception
+
+    def set_result(self, result):
+        return self._j_completable_future.complete(result)
+
+    def set_exception(self, exception):
+        self._exception = exception
+        return self._j_completable_future.completeExceptionally(exception)
+
+    def __str__(self):
+        return self._j_completable_future.toString()
diff --git a/flink-python/pyflink/common/execution_mode.py b/flink-python/pyflink/common/execution_mode.py
index 936f9aa..f3d9966 100644
--- a/flink-python/pyflink/common/execution_mode.py
+++ b/flink-python/pyflink/common/execution_mode.py
@@ -83,7 +83,7 @@ class ExecutionMode(object):
         elif j_execution_mode == JExecutionMode.BATCH_FORCED:
             return ExecutionMode.BATCH_FORCED
         else:
-            raise Exception("Unsupported java exection mode: %s" % j_execution_mode)
+            raise Exception("Unsupported java execution mode: %s" % j_execution_mode)
 
     @staticmethod
     def _to_j_execution_mode(execution_mode):
diff --git a/flink-python/pyflink/common/job_client.py b/flink-python/pyflink/common/job_client.py
new file mode 100644
index 0000000..e4a1f39
--- /dev/null
+++ b/flink-python/pyflink/common/job_client.py
@@ -0,0 +1,114 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.common.completable_future import CompletableFuture
+from pyflink.common.job_execution_result import JobExecutionResult
+from pyflink.common.job_id import JobID
+from pyflink.common.job_status import JobStatus
+
+__all__ = ['JobClient']
+
+
+class JobClient(object):
+    """
+    A client that is scoped to a specific job.
+    """
+
+    def __init__(self, j_job_client):
+        self._j_job_client = j_job_client
+
+    def get_job_id(self):
+        """
+        Returns the JobID that uniquely identifies the job this client is scoped to.
+
+        :return: JobID, or null if the job has been executed on a runtime without JobIDs
+                 or if the execution failed.
+        """
+        return JobID(self._j_job_client.getJobID())
+
+    def get_job_status(self):
+        """
+        Requests the JobStatus of the associated job.
+
+        :return: A CompletableFuture containing the JobStatus of the associated job.
+        :rtype: pyflink.common.CompletableFuture
+        """
+        return CompletableFuture(self._j_job_client.getJobStatus(), JobStatus)
+
+    def cancel(self):
+        """
+        Cancels the associated job.
+
+        :return: A CompletableFuture for canceling the associated job.
+        :rtype: pyflink.common.CompletableFuture
+        """
+        return CompletableFuture(self._j_job_client.cancel())
+
+    def stop_with_savepoint(self, advance_to_end_of_event_time, savepoint_directory):
+        """
+        Stops the associated job on Flink cluster.
+
+        Stopping works only for streaming programs. Be aware, that the job might continue to run
+        for a while after sending the stop command, because after sources stopped to emit data all
+        operators need to finish processing.
+
+        :param advance_to_end_of_event_time: Flag indicating if the source should inject a
+                                             MAX_WATERMARK in the pipeline.
+        :type advance_to_end_of_event_time: bool
+        :param savepoint_directory: Directory the savepoint should be written to.
+        :type savepoint_directory: str
+        :return: A CompletableFuture containing the path where the savepoint is located.
+        :rtype: pyflink.common.CompletableFuture
+        """
+        return CompletableFuture(
+            self._j_job_client.stopWithSavepoint(advance_to_end_of_event_time, savepoint_directory),
+            str)
+
+    def trigger_savepoint(self, savepoint_directory):
+        """
+        Triggers a savepoint for the associated job. The savepoint will be written to the given
+        savepoint directory.
+
+        :param savepoint_directory: Directory the savepoint should be written to.
+        :type savepoint_directory: str
+        :return: A CompletableFuture containing the path where the savepoint is located.
+        :rtype: pyflink.common.CompletableFuture
+        """
+        return CompletableFuture(self._j_job_client.triggerSavepoint(savepoint_directory), str)
+
+    def get_accumulators(self, class_loader):
+        """
+        Requests the accumulators of the associated job. Accumulators can be requested while it
+        is running or after it has finished. The class loader is used to deserialize the incoming
+        accumulator results.
+
+        :param class_loader: Class loader used to deserialize the incoming accumulator results.
+        :return: A CompletableFuture containing the accumulators of the associated job.
+        :rtype: pyflink.common.CompletableFuture
+        """
+        return CompletableFuture(self._j_job_client.getAccumulators(class_loader), dict)
+
+    def get_job_execution_result(self, user_class_loader):
+        """
+        Returns the JobExecutionResult result of the job execution of the submitted job.
+
+        :param user_class_loader: Class loader used to deserialize the accumulators of the job.
+        :return: A CompletableFuture containing the JobExecutionResult result of the job execution.
+        :rtype: pyflink.common.CompletableFuture
+        """
+        return CompletableFuture(self._j_job_client.getJobExecutionResult(user_class_loader),
+                                 JobExecutionResult)
diff --git a/flink-python/pyflink/common/job_execution_result.py b/flink-python/pyflink/common/job_execution_result.py
index 24926bc..26ea95e 100644
--- a/flink-python/pyflink/common/job_execution_result.py
+++ b/flink-python/pyflink/common/job_execution_result.py
@@ -15,6 +15,7 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
+from pyflink.common.job_id import JobID
 
 __all__ = ['JobExecutionResult']
 
@@ -35,7 +36,7 @@ class JobExecutionResult(object):
         :return: JobID, or null if the job has been executed on a runtime without JobIDs
                  or if the execution failed.
         """
-        return self._j_job_execution_result.getJobID()
+        return JobID(self._j_job_execution_result.getJobID())
 
     def is_job_execution_result(self):
         """
diff --git a/flink-python/pyflink/common/__init__.py b/flink-python/pyflink/common/job_id.py
similarity index 55%
copy from flink-python/pyflink/common/__init__.py
copy to flink-python/pyflink/common/job_id.py
index fcea204..49c1349 100644
--- a/flink-python/pyflink/common/__init__.py
+++ b/flink-python/pyflink/common/job_id.py
@@ -15,26 +15,21 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
+__all__ = ['JobID']
 
-"""
-Important classes used by both Flink Streaming and Batch API:
 
-    - :class:`ExecutionConfig`:
-      A config to define the behavior of the program execution.
-"""
-from pyflink.common.configuration import Configuration
-from pyflink.common.execution_config import ExecutionConfig
-from pyflink.common.execution_mode import ExecutionMode
-from pyflink.common.input_dependency_constraint import InputDependencyConstraint
-from pyflink.common.job_execution_result import JobExecutionResult
-from pyflink.common.restart_strategy import RestartStrategies, RestartStrategyConfiguration
+class JobID(object):
+    """
+    Unique (at least statistically unique) identifier for a Flink Job. Jobs in Flink correspond
+    to dataflow graphs.
 
-__all__ = [
-    'Configuration',
-    'ExecutionConfig',
-    'ExecutionMode',
-    'InputDependencyConstraint',
-    'JobExecutionResult',
-    'RestartStrategies',
-    'RestartStrategyConfiguration',
-]
+    Jobs act simultaneously as sessions, because jobs can be created and submitted incrementally
+    in different parts. Newer fragments of a graph can be attached to existing graphs, thereby
+    extending the current data flow graphs.
+    """
+
+    def __init__(self, j_job_id):
+        self._j_job_id = j_job_id
+
+    def __str__(self):
+        return self._j_job_id.toString()
diff --git a/flink-python/pyflink/common/job_status.py b/flink-python/pyflink/common/job_status.py
new file mode 100644
index 0000000..bbd4b33
--- /dev/null
+++ b/flink-python/pyflink/common/job_status.py
@@ -0,0 +1,167 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.java_gateway import get_gateway
+
+__all__ = ['JobStatus']
+
+
+class JobStatus(object):
+    """
+    Possible states of a job once it has been accepted by the job manager.
+
+    :data:`CREATED`:
+
+    Job is newly created, no task has started to run.
+
+    :data:`RUNNING`:
+
+    Some tasks are scheduled or running, some may be pending, some may be finished.
+
+    :data:`FAILING`:
+
+    The job has failed and is currently waiting for the cleanup to complete.
+
+    :data:`FAILED`:
+
+    The job has failed with a non-recoverable task failure.
+
+    :data:`CANCELLING`:
+
+    Job is being cancelled.
+
+    :data:`CANCELED`:
+
+    Job has been cancelled.
+
+    :data:`FINISHED`:
+
+    All of the job's tasks have successfully finished.
+
+    :data:`RESTARTING`:
+
+    The job is currently undergoing a reset and total restart.
+
+    :data:`SUSPENDED`:
+
+    The job has been suspended which means that it has been stopped but not been removed from a
+    potential HA job store.
+
+    :data:`RECONCILING`:
+
+    The job is currently reconciling and waits for task execution report to recover state.
+    """
+
+    CREATED = 0
+    RUNNING = 1
+    FAILING = 2
+    FAILED = 3
+    CANCELLING = 4
+    CANCELED = 5
+    FINISHED = 6
+    RESTARTING = 7
+    SUSPENDED = 8
+    RECONCILING = 9
+
+    def __init__(self, j_job_status) -> None:
+        super().__init__()
+        self._j_job_status = j_job_status
+
+    def is_globally_terminal_state(self):
+        """
+        Checks whether this state is <i>globally terminal</i>. A globally terminal job
+        is complete and cannot fail any more and will not be restarted or recovered by another
+        standby master node.
+
+        When a globally terminal state has been reached, all recovery data for the job is
+        dropped from the high-availability services.
+
+        :return: ``True`` if this job status is globally terminal, ``False`` otherwise.
+        """
+        return self._j_job_status.isGloballyTerminalState()
+
+    def is_terminal_state(self):
+        """
+        Checks whether this state is locally terminal. Locally terminal refers to the
+        state of a job's execution graph within an executing JobManager. If the execution graph
+        is locally terminal, the JobManager will not continue executing or recovering the job.
+
+        The only state that is locally terminal, but not globally terminal is SUSPENDED,
+        which is typically entered when the executing JobManager looses its leader status.
+
+        :return: ``True`` if this job status is terminal, ``False`` otherwise.
+        """
+        return self._j_job_status.isTerminalState()
+
+    @staticmethod
+    def _from_j_job_status(j_job_status):
+        gateway = get_gateway()
+        JJobStatus = gateway.jvm.org.apache.flink.api.common.JobStatus
+        if j_job_status == JJobStatus.CREATED:
+            return JobStatus.CREATED
+        elif j_job_status == JJobStatus.RUNNING:
+            return JobStatus.RUNNING
+        elif j_job_status == JJobStatus.FAILING:
+            return JobStatus.FAILING
+        elif j_job_status == JJobStatus.FAILED:
+            return JobStatus.FAILED
+        elif j_job_status == JJobStatus.CANCELLING:
+            return JobStatus.CANCELLING
+        elif j_job_status == JJobStatus.CANCELED:
+            return JobStatus.CANCELED
+        elif j_job_status == JJobStatus.FINISHED:
+            return JobStatus.FINISHED
+        elif j_job_status == JJobStatus.RESTARTING:
+            return JobStatus.RESTARTING
+        elif j_job_status == JJobStatus.SUSPENDED:
+            return JobStatus.SUSPENDED
+        elif j_job_status == JJobStatus.RECONCILING:
+            return JobStatus.RECONCILING
+        else:
+            raise Exception("Unsupported java job status: %s" % j_job_status)
+
+    @staticmethod
+    def _to_j_job_status(job_status):
+        gateway = get_gateway()
+        JJobStatus = gateway.jvm.org.apache.flink.api.common.JobStatus
+        if job_status == JobStatus.CREATED:
+            return JJobStatus.CREATED
+        elif job_status == JobStatus.RUNNING:
+            return JJobStatus.RUNNING
+        elif job_status == JobStatus.FAILING:
+            return JJobStatus.FAILING
+        elif job_status == JobStatus.FAILED:
+            return JJobStatus.FAILED
+        elif job_status == JobStatus.CANCELLING:
+            return JJobStatus.CANCELLING
+        elif job_status == JobStatus.CANCELED:
+            return JJobStatus.CANCELED
+        elif job_status == JobStatus.FINISHED:
+            return JJobStatus.FINISHED
+        elif job_status == JobStatus.RESTARTING:
+            return JJobStatus.RESTARTING
+        elif job_status == JobStatus.SUSPENDED:
+            return JJobStatus.SUSPENDED
+        elif job_status == JobStatus.RECONCILING:
+            return JJobStatus.RECONCILING
+        else:
+            raise TypeError("Unsupported job status: %s, supported job statuses are: "
+                            "JobStatus.CREATED, JobStatus.RUNNING, "
+                            "JobStatus.FAILING, JobStatus.FAILED, "
+                            "JobStatus.CANCELLING, JobStatus.CANCELED, "
+                            "JobStatus.FINISHED, JobStatus.RESTARTING, "
+                            "JobStatus.SUSPENDED and JobStatus.RECONCILING." % job_status)
diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py
index 010434c..12bdf73 100644
--- a/flink-python/pyflink/table/__init__.py
+++ b/flink-python/pyflink/table/__init__.py
@@ -61,43 +61,47 @@ Important classes of Flink Table API:
 from __future__ import absolute_import
 
 from pyflink.table.environment_settings import EnvironmentSettings
+from pyflink.table.explain_detail import ExplainDetail
+from pyflink.table.result_kind import ResultKind
+from pyflink.table.sinks import CsvTableSink, TableSink, WriteMode
+from pyflink.table.sources import CsvTableSource, TableSource
 from pyflink.table.sql_dialect import SqlDialect
-from pyflink.table.table import Table, GroupedTable, GroupWindowedTable, OverWindowedTable, \
+from pyflink.table.statement_set import StatementSet
+from pyflink.table.table import GroupWindowedTable, GroupedTable, OverWindowedTable, Table, \
     WindowGroupedTable
 from pyflink.table.table_config import TableConfig
 from pyflink.table.table_environment import (TableEnvironment, StreamTableEnvironment,
                                              BatchTableEnvironment)
-from pyflink.table.sinks import TableSink, CsvTableSink, WriteMode
-from pyflink.table.sources import TableSource, CsvTableSource
-from pyflink.table.types import DataTypes, UserDefinedType, Row
+from pyflink.table.table_result import TableResult
 from pyflink.table.table_schema import TableSchema
+from pyflink.table.types import DataTypes, UserDefinedType, Row
 from pyflink.table.udf import FunctionContext, ScalarFunction
-from pyflink.table.explain_detail import ExplainDetail
-from pyflink.table.statement_set import StatementSet
 
 __all__ = [
-    'TableEnvironment',
-    'StreamTableEnvironment',
     'BatchTableEnvironment',
+    'CsvTableSink',
+    'CsvTableSource',
+    'DataTypes',
     'EnvironmentSettings',
-    'Table',
-    'GroupedTable',
+    'ExplainDetail',
+    'FunctionContext',
     'GroupWindowedTable',
+    'GroupedTable',
     'OverWindowedTable',
-    'WindowGroupedTable',
+    'ResultKind',
+    'Row',
+    'ScalarFunction',
+    'SqlDialect',
+    'StatementSet',
+    'StreamTableEnvironment',
+    'Table',
     'TableConfig',
+    'TableEnvironment',
+    'TableResult',
+    'TableSchema',
     'TableSink',
     'TableSource',
-    'WriteMode',
-    'CsvTableSink',
-    'CsvTableSource',
-    'DataTypes',
     'UserDefinedType',
-    'Row',
-    'TableSchema',
-    'FunctionContext',
-    'ScalarFunction',
-    'SqlDialect',
-    'ExplainDetail',
-    'StatementSet'
+    'WindowGroupedTable',
+    'WriteMode'
 ]
diff --git a/flink-python/pyflink/common/__init__.py b/flink-python/pyflink/table/result_kind.py
similarity index 50%
copy from flink-python/pyflink/common/__init__.py
copy to flink-python/pyflink/table/result_kind.py
index fcea204..68c324d 100644
--- a/flink-python/pyflink/common/__init__.py
+++ b/flink-python/pyflink/table/result_kind.py
@@ -15,26 +15,35 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
+from pyflink.java_gateway import get_gateway
 
-"""
-Important classes used by both Flink Streaming and Batch API:
-
-    - :class:`ExecutionConfig`:
-      A config to define the behavior of the program execution.
-"""
-from pyflink.common.configuration import Configuration
-from pyflink.common.execution_config import ExecutionConfig
-from pyflink.common.execution_mode import ExecutionMode
-from pyflink.common.input_dependency_constraint import InputDependencyConstraint
-from pyflink.common.job_execution_result import JobExecutionResult
-from pyflink.common.restart_strategy import RestartStrategies, RestartStrategyConfiguration
-
-__all__ = [
-    'Configuration',
-    'ExecutionConfig',
-    'ExecutionMode',
-    'InputDependencyConstraint',
-    'JobExecutionResult',
-    'RestartStrategies',
-    'RestartStrategyConfiguration',
-]
+__all__ = ['ResultKind']
+
+
+class ResultKind(object):
+    """
+    ResultKind defines the types of the result.
+
+    :data:`SUCCESS`:
+
+    The statement (e.g. DDL, USE) executes successfully, and the result only contains a simple "OK".
+
+    :data:`SUCCESS_WITH_CONTENT`:
+
+    The statement (e.g. DML, DQL, SHOW) executes successfully, and the result contains important
+    content.
+    """
+
+    SUCCESS = 0
+    SUCCESS_WITH_CONTENT = 1
+
+    @staticmethod
+    def _from_j_result_kind(j_result_kind):
+        gateway = get_gateway()
+        JResultKind = gateway.jvm.org.apache.flink.table.api.ResultKind
+        if j_result_kind == JResultKind.SUCCESS:
+            return ResultKind.SUCCESS
+        elif j_result_kind == JResultKind.SUCCESS_WITH_CONTENT:
+            return ResultKind.SUCCESS_WITH_CONTENT
+        else:
+            raise Exception("Unsupported Java result kind: %s" % j_result_kind)
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index d2ec220..1d2be8f 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -34,6 +34,7 @@ from pyflink.table.descriptors import StreamTableDescriptor, BatchTableDescripto
 
 from pyflink.java_gateway import get_gateway
 from pyflink.table import Table
+from pyflink.table.table_result import TableResult
 from pyflink.table.types import _to_java_type, _create_type_verifier, RowType, DataType, \
     _infer_schema_from_data, _create_converter, from_arrow_type, RowField, create_arrow_schema
 from pyflink.util import utils
@@ -531,8 +532,7 @@ class TableEnvironment(object):
                 the affected row count for `DML` (-1 means unknown),
                 or a string message ("OK") for other statements.
         """
-        # TODO convert java TableResult to python TableResult once FLINK-17303 is finished
-        return self._j_tenv.executeSql(stmt)
+        return TableResult(self._j_tenv.executeSql(stmt))
 
     def create_statement_set(self):
         """
diff --git a/flink-python/pyflink/table/table_result.py b/flink-python/pyflink/table/table_result.py
new file mode 100644
index 0000000..5071424
--- /dev/null
+++ b/flink-python/pyflink/table/table_result.py
@@ -0,0 +1,69 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.common.job_client import JobClient
+from pyflink.table.result_kind import ResultKind
+from pyflink.table.table_schema import TableSchema
+
+__all__ = ['TableResult']
+
+
+class TableResult(object):
+    """
+    A :class:`~pyflink.table.TableResult` is the representation of the statement execution result.
+    """
+
+    def __init__(self, j_table_result):
+        self._j_table_result = j_table_result
+
+    def get_job_client(self):
+        """
+        For DML and DQL statement, return the JobClient which associates the submitted Flink job.
+        For other statements (e.g.  DDL, DCL) return empty.
+
+        :return: The job client, optional.
+        :rtype: pyflink.common.JobClient
+        """
+        job_client = self._j_table_result.getJobClient()
+        if job_client.isPresent():
+            return JobClient(job_client.get())
+        else:
+            return None
+
+    def get_table_schema(self):
+        """
+        Get the schema of result.
+
+        :return: The schema of result.
+        :rtype: pyflink.table.TableSchema
+        """
+        return TableSchema(j_table_schema=self._j_table_result.getTableSchema())
+
+    def get_result_kind(self):
+        """
+        Return the ResultKind which represents the result type.
+
+        :return: The result kind.
+        :rtype: pyflink.table.ResultKind
+        """
+        return ResultKind._from_j_result_kind(self._j_table_result.getResultKind())
+
+    def print(self):
+        """
+        Print the result contents as tableau form to client console.
+        """
+        self._j_table_result.print()
diff --git a/flink-python/pyflink/table/tests/test_sql.py b/flink-python/pyflink/table/tests/test_sql.py
index 61de9b1..c8c0ee1 100644
--- a/flink-python/pyflink/table/tests/test_sql.py
+++ b/flink-python/pyflink/table/tests/test_sql.py
@@ -21,10 +21,8 @@ import subprocess
 import unittest
 
 from pyflink.find_flink_home import _find_flink_source_root
-
 from pyflink.java_gateway import get_gateway
-
-from pyflink.table import DataTypes
+from pyflink.table import DataTypes, ResultKind
 from pyflink.testing import source_sink_utils
 from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, PyFlinkBatchTableTestCase
 
@@ -58,6 +56,49 @@ class StreamSqlTests(SqlTests, PyFlinkStreamTableTestCase):
         expected = ['2,Hi,Hello', '3,Hello,Hello']
         self.assert_equals(actual, expected)
 
+    def test_execute_sql(self):
+        t_env = self.t_env
+        table_result = t_env.execute_sql("create table tbl"
+                                         "("
+                                         "   a bigint,"
+                                         "   b int,"
+                                         "   c varchar"
+                                         ") with ("
+                                         "  'connector' = 'COLLECTION',"
+                                         "   'is-bounded' = 'false'"
+                                         ")")
+        self.assertIsNone(table_result.get_job_client())
+        self.assert_equals(table_result.get_table_schema().get_field_names(), ["result"])
+        self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
+        table_result.print()
+
+        table_result = t_env.execute_sql("alter table tbl set ('k1' = 'a', 'k2' = 'b')")
+        self.assertIsNone(table_result.get_job_client())
+        self.assert_equals(table_result.get_table_schema().get_field_names(), ["result"])
+        self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
+        table_result.print()
+
+        field_names = ["k1", "k2", "c"]
+        field_types = [DataTypes.BIGINT(), DataTypes.INT(), DataTypes.STRING()]
+        t_env.register_table_sink(
+            "sinks",
+            source_sink_utils.TestAppendSink(field_names, field_types))
+        table_result = t_env.execute_sql("insert into sinks select * from tbl")
+        job_execution_result = table_result.get_job_client().get_job_execution_result(
+            get_gateway().jvm.Thread.currentThread().getContextClassLoader()).result()
+        self.assertIsNotNone(job_execution_result.get_job_id())
+        self.assertIsNotNone(job_execution_result.get_job_execution_result())
+        self.assert_equals(table_result.get_table_schema().get_field_names(),
+                           ["default_catalog.default_database.sinks"])
+        self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS_WITH_CONTENT)
+        table_result.print()
+
+        table_result = t_env.execute_sql("drop table tbl")
+        self.assertIsNone(table_result.get_job_client())
+        self.assert_equals(table_result.get_table_schema().get_field_names(), ["result"])
+        self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
+        table_result.print()
+
     def test_sql_update(self):
         t_env = self.t_env
         source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])