You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by he...@apache.org on 2020/05/29 05:55:18 UTC
[flink] branch release-1.11 updated: [FLINK-17303][python] Return
TableResult for Python TableEnvironment
This is an automated email from the ASF dual-hosted git repository.
hequn pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 386cea7 [FLINK-17303][python] Return TableResult for Python TableEnvironment
386cea7 is described below
commit 386cea7beccf10fd5893ab2d960ec886a8acfdd0
Author: SteNicholas <pr...@163.com>
AuthorDate: Wed May 6 10:09:55 2020 +0800
[FLINK-17303][python] Return TableResult for Python TableEnvironment
This closes #12246.
---
flink-python/pyflink/common/__init__.py | 8 +
flink-python/pyflink/common/completable_future.py | 63 ++++++++
flink-python/pyflink/common/execution_mode.py | 2 +-
flink-python/pyflink/common/job_client.py | 114 ++++++++++++++
.../pyflink/common/job_execution_result.py | 3 +-
.../pyflink/common/{__init__.py => job_id.py} | 35 ++---
flink-python/pyflink/common/job_status.py | 167 +++++++++++++++++++++
flink-python/pyflink/table/__init__.py | 48 +++---
.../{common/__init__.py => table/result_kind.py} | 53 ++++---
flink-python/pyflink/table/table_environment.py | 4 +-
flink-python/pyflink/table/table_result.py | 69 +++++++++
flink-python/pyflink/table/tests/test_sql.py | 47 +++++-
12 files changed, 542 insertions(+), 71 deletions(-)
diff --git a/flink-python/pyflink/common/__init__.py b/flink-python/pyflink/common/__init__.py
index fcea204..6bf412d 100644
--- a/flink-python/pyflink/common/__init__.py
+++ b/flink-python/pyflink/common/__init__.py
@@ -22,19 +22,27 @@ Important classes used by both Flink Streaming and Batch API:
- :class:`ExecutionConfig`:
A config to define the behavior of the program execution.
"""
+from pyflink.common.completable_future import CompletableFuture
from pyflink.common.configuration import Configuration
from pyflink.common.execution_config import ExecutionConfig
from pyflink.common.execution_mode import ExecutionMode
from pyflink.common.input_dependency_constraint import InputDependencyConstraint
+from pyflink.common.job_client import JobClient
from pyflink.common.job_execution_result import JobExecutionResult
+from pyflink.common.job_id import JobID
+from pyflink.common.job_status import JobStatus
from pyflink.common.restart_strategy import RestartStrategies, RestartStrategyConfiguration
__all__ = [
+ 'CompletableFuture',
'Configuration',
'ExecutionConfig',
'ExecutionMode',
'InputDependencyConstraint',
+ 'JobClient',
'JobExecutionResult',
+ 'JobID',
+ 'JobStatus',
'RestartStrategies',
'RestartStrategyConfiguration',
]
diff --git a/flink-python/pyflink/common/completable_future.py b/flink-python/pyflink/common/completable_future.py
new file mode 100644
index 0000000..266ea26
--- /dev/null
+++ b/flink-python/pyflink/common/completable_future.py
@@ -0,0 +1,63 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from asyncio import Future
+
+__all__ = ['CompletableFuture']
+
+
+class CompletableFuture(Future):
+ """
+ A Future that may be explicitly completed (setting its value and status), supporting dependent
+ functions and actions that trigger upon its completion.
+
+ When two or more threads attempt to set_result, set_exception, or cancel a CompletableFuture,
+ only one of them succeeds.
+ """
+
+ def __init__(self, j_completable_future, py_class=None):
+ super().__init__()
+ self._j_completable_future = j_completable_future
+ self._py_class = py_class
+
+ def cancel(self):
+ return self._j_completable_future.cancel(True)
+
+ def cancelled(self):
+ return self._j_completable_future.isCancelled()
+
+ def done(self):
+ return self._j_completable_future.isDone()
+
+ def result(self):
+ if self._py_class is None:
+ return self._j_completable_future.get()
+ else:
+ return self._py_class(self._j_completable_future.get())
+
+ def exception(self):
+ return self._exception
+
+ def set_result(self, result):
+ return self._j_completable_future.complete(result)
+
+ def set_exception(self, exception):
+ self._exception = exception
+ return self._j_completable_future.completeExceptionally(exception)
+
+ def __str__(self):
+ return self._j_completable_future.toString()
diff --git a/flink-python/pyflink/common/execution_mode.py b/flink-python/pyflink/common/execution_mode.py
index 936f9aa..f3d9966 100644
--- a/flink-python/pyflink/common/execution_mode.py
+++ b/flink-python/pyflink/common/execution_mode.py
@@ -83,7 +83,7 @@ class ExecutionMode(object):
elif j_execution_mode == JExecutionMode.BATCH_FORCED:
return ExecutionMode.BATCH_FORCED
else:
- raise Exception("Unsupported java exection mode: %s" % j_execution_mode)
+ raise Exception("Unsupported java execution mode: %s" % j_execution_mode)
@staticmethod
def _to_j_execution_mode(execution_mode):
diff --git a/flink-python/pyflink/common/job_client.py b/flink-python/pyflink/common/job_client.py
new file mode 100644
index 0000000..e4a1f39
--- /dev/null
+++ b/flink-python/pyflink/common/job_client.py
@@ -0,0 +1,114 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.common.completable_future import CompletableFuture
+from pyflink.common.job_execution_result import JobExecutionResult
+from pyflink.common.job_id import JobID
+from pyflink.common.job_status import JobStatus
+
+__all__ = ['JobClient']
+
+
+class JobClient(object):
+ """
+ A client that is scoped to a specific job.
+ """
+
+ def __init__(self, j_job_client):
+ self._j_job_client = j_job_client
+
+ def get_job_id(self):
+ """
+ Returns the JobID that uniquely identifies the job this client is scoped to.
+
+ :return: JobID, or null if the job has been executed on a runtime without JobIDs
+ or if the execution failed.
+ """
+ return JobID(self._j_job_client.getJobID())
+
+ def get_job_status(self):
+ """
+ Requests the JobStatus of the associated job.
+
+ :return: A CompletableFuture containing the JobStatus of the associated job.
+ :rtype: pyflink.common.CompletableFuture
+ """
+ return CompletableFuture(self._j_job_client.getJobStatus(), JobStatus)
+
+ def cancel(self):
+ """
+ Cancels the associated job.
+
+ :return: A CompletableFuture for canceling the associated job.
+ :rtype: pyflink.common.CompletableFuture
+ """
+ return CompletableFuture(self._j_job_client.cancel())
+
+ def stop_with_savepoint(self, advance_to_end_of_event_time, savepoint_directory):
+ """
+ Stops the associated job on Flink cluster.
+
+ Stopping works only for streaming programs. Be aware, that the job might continue to run
+ for a while after sending the stop command, because after sources stopped to emit data all
+ operators need to finish processing.
+
+ :param advance_to_end_of_event_time: Flag indicating if the source should inject a
+ MAX_WATERMARK in the pipeline.
+ :type advance_to_end_of_event_time: bool
+ :param savepoint_directory: Directory the savepoint should be written to.
+ :type savepoint_directory: str
+ :return: A CompletableFuture containing the path where the savepoint is located.
+ :rtype: pyflink.common.CompletableFuture
+ """
+ return CompletableFuture(
+ self._j_job_client.stopWithSavepoint(advance_to_end_of_event_time, savepoint_directory),
+ str)
+
+ def trigger_savepoint(self, savepoint_directory):
+ """
+ Triggers a savepoint for the associated job. The savepoint will be written to the given
+ savepoint directory.
+
+ :param savepoint_directory: Directory the savepoint should be written to.
+ :type savepoint_directory: str
+ :return: A CompletableFuture containing the path where the savepoint is located.
+ :rtype: pyflink.common.CompletableFuture
+ """
+ return CompletableFuture(self._j_job_client.triggerSavepoint(savepoint_directory), str)
+
+ def get_accumulators(self, class_loader):
+ """
+ Requests the accumulators of the associated job. Accumulators can be requested while it
+ is running or after it has finished. The class loader is used to deserialize the incoming
+ accumulator results.
+
+ :param class_loader: Class loader used to deserialize the incoming accumulator results.
+ :return: A CompletableFuture containing the accumulators of the associated job.
+ :rtype: pyflink.common.CompletableFuture
+ """
+ return CompletableFuture(self._j_job_client.getAccumulators(class_loader), dict)
+
+ def get_job_execution_result(self, user_class_loader):
+ """
+ Returns the JobExecutionResult result of the job execution of the submitted job.
+
+ :param user_class_loader: Class loader used to deserialize the accumulators of the job.
+ :return: A CompletableFuture containing the JobExecutionResult result of the job execution.
+ :rtype: pyflink.common.CompletableFuture
+ """
+ return CompletableFuture(self._j_job_client.getJobExecutionResult(user_class_loader),
+ JobExecutionResult)
diff --git a/flink-python/pyflink/common/job_execution_result.py b/flink-python/pyflink/common/job_execution_result.py
index 24926bc..26ea95e 100644
--- a/flink-python/pyflink/common/job_execution_result.py
+++ b/flink-python/pyflink/common/job_execution_result.py
@@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
+from pyflink.common.job_id import JobID
__all__ = ['JobExecutionResult']
@@ -35,7 +36,7 @@ class JobExecutionResult(object):
:return: JobID, or null if the job has been executed on a runtime without JobIDs
or if the execution failed.
"""
- return self._j_job_execution_result.getJobID()
+ return JobID(self._j_job_execution_result.getJobID())
def is_job_execution_result(self):
"""
diff --git a/flink-python/pyflink/common/__init__.py b/flink-python/pyflink/common/job_id.py
similarity index 55%
copy from flink-python/pyflink/common/__init__.py
copy to flink-python/pyflink/common/job_id.py
index fcea204..49c1349 100644
--- a/flink-python/pyflink/common/__init__.py
+++ b/flink-python/pyflink/common/job_id.py
@@ -15,26 +15,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
+__all__ = ['JobID']
-"""
-Important classes used by both Flink Streaming and Batch API:
- - :class:`ExecutionConfig`:
- A config to define the behavior of the program execution.
-"""
-from pyflink.common.configuration import Configuration
-from pyflink.common.execution_config import ExecutionConfig
-from pyflink.common.execution_mode import ExecutionMode
-from pyflink.common.input_dependency_constraint import InputDependencyConstraint
-from pyflink.common.job_execution_result import JobExecutionResult
-from pyflink.common.restart_strategy import RestartStrategies, RestartStrategyConfiguration
+class JobID(object):
+ """
+ Unique (at least statistically unique) identifier for a Flink Job. Jobs in Flink correspond
+ to dataflow graphs.
-__all__ = [
- 'Configuration',
- 'ExecutionConfig',
- 'ExecutionMode',
- 'InputDependencyConstraint',
- 'JobExecutionResult',
- 'RestartStrategies',
- 'RestartStrategyConfiguration',
-]
+ Jobs act simultaneously as sessions, because jobs can be created and submitted incrementally
+ in different parts. Newer fragments of a graph can be attached to existing graphs, thereby
+ extending the current data flow graphs.
+ """
+
+ def __init__(self, j_job_id):
+ self._j_job_id = j_job_id
+
+ def __str__(self):
+ return self._j_job_id.toString()
diff --git a/flink-python/pyflink/common/job_status.py b/flink-python/pyflink/common/job_status.py
new file mode 100644
index 0000000..bbd4b33
--- /dev/null
+++ b/flink-python/pyflink/common/job_status.py
@@ -0,0 +1,167 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.java_gateway import get_gateway
+
+__all__ = ['JobStatus']
+
+
+class JobStatus(object):
+ """
+ Possible states of a job once it has been accepted by the job manager.
+
+ :data:`CREATED`:
+
+ Job is newly created, no task has started to run.
+
+ :data:`RUNNING`:
+
+ Some tasks are scheduled or running, some may be pending, some may be finished.
+
+ :data:`FAILING`:
+
+ The job has failed and is currently waiting for the cleanup to complete.
+
+ :data:`FAILED`:
+
+ The job has failed with a non-recoverable task failure.
+
+ :data:`CANCELLING`:
+
+ Job is being cancelled.
+
+ :data:`CANCELED`:
+
+ Job has been cancelled.
+
+ :data:`FINISHED`:
+
+ All of the job's tasks have successfully finished.
+
+ :data:`RESTARTING`:
+
+ The job is currently undergoing a reset and total restart.
+
+ :data:`SUSPENDED`:
+
+ The job has been suspended which means that it has been stopped but not been removed from a
+ potential HA job store.
+
+ :data:`RECONCILING`:
+
+ The job is currently reconciling and waits for task execution report to recover state.
+ """
+
+ CREATED = 0
+ RUNNING = 1
+ FAILING = 2
+ FAILED = 3
+ CANCELLING = 4
+ CANCELED = 5
+ FINISHED = 6
+ RESTARTING = 7
+ SUSPENDED = 8
+ RECONCILING = 9
+
+ def __init__(self, j_job_status) -> None:
+ super().__init__()
+ self._j_job_status = j_job_status
+
+ def is_globally_terminal_state(self):
+ """
+ Checks whether this state is <i>globally terminal</i>. A globally terminal job
+ is complete and cannot fail any more and will not be restarted or recovered by another
+ standby master node.
+
+ When a globally terminal state has been reached, all recovery data for the job is
+ dropped from the high-availability services.
+
+ :return: ``True`` if this job status is globally terminal, ``False`` otherwise.
+ """
+ return self._j_job_status.isGloballyTerminalState()
+
+ def is_terminal_state(self):
+ """
+ Checks whether this state is locally terminal. Locally terminal refers to the
+ state of a job's execution graph within an executing JobManager. If the execution graph
+ is locally terminal, the JobManager will not continue executing or recovering the job.
+
+ The only state that is locally terminal, but not globally terminal is SUSPENDED,
+ which is typically entered when the executing JobManager looses its leader status.
+
+ :return: ``True`` if this job status is terminal, ``False`` otherwise.
+ """
+ return self._j_job_status.isTerminalState()
+
+ @staticmethod
+ def _from_j_job_status(j_job_status):
+ gateway = get_gateway()
+ JJobStatus = gateway.jvm.org.apache.flink.api.common.JobStatus
+ if j_job_status == JJobStatus.CREATED:
+ return JobStatus.CREATED
+ elif j_job_status == JJobStatus.RUNNING:
+ return JobStatus.RUNNING
+ elif j_job_status == JJobStatus.FAILING:
+ return JobStatus.FAILING
+ elif j_job_status == JJobStatus.FAILED:
+ return JobStatus.FAILED
+ elif j_job_status == JJobStatus.CANCELLING:
+ return JobStatus.CANCELLING
+ elif j_job_status == JJobStatus.CANCELED:
+ return JobStatus.CANCELED
+ elif j_job_status == JJobStatus.FINISHED:
+ return JobStatus.FINISHED
+ elif j_job_status == JJobStatus.RESTARTING:
+ return JobStatus.RESTARTING
+ elif j_job_status == JJobStatus.SUSPENDED:
+ return JobStatus.SUSPENDED
+ elif j_job_status == JJobStatus.RECONCILING:
+ return JobStatus.RECONCILING
+ else:
+ raise Exception("Unsupported java job status: %s" % j_job_status)
+
+ @staticmethod
+ def _to_j_job_status(job_status):
+ gateway = get_gateway()
+ JJobStatus = gateway.jvm.org.apache.flink.api.common.JobStatus
+ if job_status == JobStatus.CREATED:
+ return JJobStatus.CREATED
+ elif job_status == JobStatus.RUNNING:
+ return JJobStatus.RUNNING
+ elif job_status == JobStatus.FAILING:
+ return JJobStatus.FAILING
+ elif job_status == JobStatus.FAILED:
+ return JJobStatus.FAILED
+ elif job_status == JobStatus.CANCELLING:
+ return JJobStatus.CANCELLING
+ elif job_status == JobStatus.CANCELED:
+ return JJobStatus.CANCELED
+ elif job_status == JobStatus.FINISHED:
+ return JJobStatus.FINISHED
+ elif job_status == JobStatus.RESTARTING:
+ return JJobStatus.RESTARTING
+ elif job_status == JobStatus.SUSPENDED:
+ return JJobStatus.SUSPENDED
+ elif job_status == JobStatus.RECONCILING:
+ return JJobStatus.RECONCILING
+ else:
+ raise TypeError("Unsupported job status: %s, supported job statuses are: "
+ "JobStatus.CREATED, JobStatus.RUNNING, "
+ "JobStatus.FAILING, JobStatus.FAILED, "
+ "JobStatus.CANCELLING, JobStatus.CANCELED, "
+ "JobStatus.FINISHED, JobStatus.RESTARTING, "
+ "JobStatus.SUSPENDED and JobStatus.RECONCILING." % job_status)
diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py
index 010434c..12bdf73 100644
--- a/flink-python/pyflink/table/__init__.py
+++ b/flink-python/pyflink/table/__init__.py
@@ -61,43 +61,47 @@ Important classes of Flink Table API:
from __future__ import absolute_import
from pyflink.table.environment_settings import EnvironmentSettings
+from pyflink.table.explain_detail import ExplainDetail
+from pyflink.table.result_kind import ResultKind
+from pyflink.table.sinks import CsvTableSink, TableSink, WriteMode
+from pyflink.table.sources import CsvTableSource, TableSource
from pyflink.table.sql_dialect import SqlDialect
-from pyflink.table.table import Table, GroupedTable, GroupWindowedTable, OverWindowedTable, \
+from pyflink.table.statement_set import StatementSet
+from pyflink.table.table import GroupWindowedTable, GroupedTable, OverWindowedTable, Table, \
WindowGroupedTable
from pyflink.table.table_config import TableConfig
from pyflink.table.table_environment import (TableEnvironment, StreamTableEnvironment,
BatchTableEnvironment)
-from pyflink.table.sinks import TableSink, CsvTableSink, WriteMode
-from pyflink.table.sources import TableSource, CsvTableSource
-from pyflink.table.types import DataTypes, UserDefinedType, Row
+from pyflink.table.table_result import TableResult
from pyflink.table.table_schema import TableSchema
+from pyflink.table.types import DataTypes, UserDefinedType, Row
from pyflink.table.udf import FunctionContext, ScalarFunction
-from pyflink.table.explain_detail import ExplainDetail
-from pyflink.table.statement_set import StatementSet
__all__ = [
- 'TableEnvironment',
- 'StreamTableEnvironment',
'BatchTableEnvironment',
+ 'CsvTableSink',
+ 'CsvTableSource',
+ 'DataTypes',
'EnvironmentSettings',
- 'Table',
- 'GroupedTable',
+ 'ExplainDetail',
+ 'FunctionContext',
'GroupWindowedTable',
+ 'GroupedTable',
'OverWindowedTable',
- 'WindowGroupedTable',
+ 'ResultKind',
+ 'Row',
+ 'ScalarFunction',
+ 'SqlDialect',
+ 'StatementSet',
+ 'StreamTableEnvironment',
+ 'Table',
'TableConfig',
+ 'TableEnvironment',
+ 'TableResult',
+ 'TableSchema',
'TableSink',
'TableSource',
- 'WriteMode',
- 'CsvTableSink',
- 'CsvTableSource',
- 'DataTypes',
'UserDefinedType',
- 'Row',
- 'TableSchema',
- 'FunctionContext',
- 'ScalarFunction',
- 'SqlDialect',
- 'ExplainDetail',
- 'StatementSet'
+ 'WindowGroupedTable',
+ 'WriteMode'
]
diff --git a/flink-python/pyflink/common/__init__.py b/flink-python/pyflink/table/result_kind.py
similarity index 50%
copy from flink-python/pyflink/common/__init__.py
copy to flink-python/pyflink/table/result_kind.py
index fcea204..68c324d 100644
--- a/flink-python/pyflink/common/__init__.py
+++ b/flink-python/pyflink/table/result_kind.py
@@ -15,26 +15,35 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
+from pyflink.java_gateway import get_gateway
-"""
-Important classes used by both Flink Streaming and Batch API:
-
- - :class:`ExecutionConfig`:
- A config to define the behavior of the program execution.
-"""
-from pyflink.common.configuration import Configuration
-from pyflink.common.execution_config import ExecutionConfig
-from pyflink.common.execution_mode import ExecutionMode
-from pyflink.common.input_dependency_constraint import InputDependencyConstraint
-from pyflink.common.job_execution_result import JobExecutionResult
-from pyflink.common.restart_strategy import RestartStrategies, RestartStrategyConfiguration
-
-__all__ = [
- 'Configuration',
- 'ExecutionConfig',
- 'ExecutionMode',
- 'InputDependencyConstraint',
- 'JobExecutionResult',
- 'RestartStrategies',
- 'RestartStrategyConfiguration',
-]
+__all__ = ['ResultKind']
+
+
+class ResultKind(object):
+ """
+ ResultKind defines the types of the result.
+
+ :data:`SUCCESS`:
+
+ The statement (e.g. DDL, USE) executes successfully, and the result only contains a simple "OK".
+
+ :data:`SUCCESS_WITH_CONTENT`:
+
+ The statement (e.g. DML, DQL, SHOW) executes successfully, and the result contains important
+ content.
+ """
+
+ SUCCESS = 0
+ SUCCESS_WITH_CONTENT = 1
+
+ @staticmethod
+ def _from_j_result_kind(j_result_kind):
+ gateway = get_gateway()
+ JResultKind = gateway.jvm.org.apache.flink.table.api.ResultKind
+ if j_result_kind == JResultKind.SUCCESS:
+ return ResultKind.SUCCESS
+ elif j_result_kind == JResultKind.SUCCESS_WITH_CONTENT:
+ return ResultKind.SUCCESS_WITH_CONTENT
+ else:
+ raise Exception("Unsupported Java result kind: %s" % j_result_kind)
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index d2ec220..1d2be8f 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -34,6 +34,7 @@ from pyflink.table.descriptors import StreamTableDescriptor, BatchTableDescripto
from pyflink.java_gateway import get_gateway
from pyflink.table import Table
+from pyflink.table.table_result import TableResult
from pyflink.table.types import _to_java_type, _create_type_verifier, RowType, DataType, \
_infer_schema_from_data, _create_converter, from_arrow_type, RowField, create_arrow_schema
from pyflink.util import utils
@@ -531,8 +532,7 @@ class TableEnvironment(object):
the affected row count for `DML` (-1 means unknown),
or a string message ("OK") for other statements.
"""
- # TODO convert java TableResult to python TableResult once FLINK-17303 is finished
- return self._j_tenv.executeSql(stmt)
+ return TableResult(self._j_tenv.executeSql(stmt))
def create_statement_set(self):
"""
diff --git a/flink-python/pyflink/table/table_result.py b/flink-python/pyflink/table/table_result.py
new file mode 100644
index 0000000..5071424
--- /dev/null
+++ b/flink-python/pyflink/table/table_result.py
@@ -0,0 +1,69 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.common.job_client import JobClient
+from pyflink.table.result_kind import ResultKind
+from pyflink.table.table_schema import TableSchema
+
+__all__ = ['TableResult']
+
+
+class TableResult(object):
+ """
+ A :class:`~pyflink.table.TableResult` is the representation of the statement execution result.
+ """
+
+ def __init__(self, j_table_result):
+ self._j_table_result = j_table_result
+
+ def get_job_client(self):
+ """
+ For DML and DQL statement, return the JobClient which associates the submitted Flink job.
+ For other statements (e.g. DDL, DCL) return empty.
+
+ :return: The job client, optional.
+ :rtype: pyflink.common.JobClient
+ """
+ job_client = self._j_table_result.getJobClient()
+ if job_client.isPresent():
+ return JobClient(job_client.get())
+ else:
+ return None
+
+ def get_table_schema(self):
+ """
+ Get the schema of result.
+
+ :return: The schema of result.
+ :rtype: pyflink.table.TableSchema
+ """
+ return TableSchema(j_table_schema=self._j_table_result.getTableSchema())
+
+ def get_result_kind(self):
+ """
+ Return the ResultKind which represents the result type.
+
+ :return: The result kind.
+ :rtype: pyflink.table.ResultKind
+ """
+ return ResultKind._from_j_result_kind(self._j_table_result.getResultKind())
+
+ def print(self):
+ """
+ Print the result contents as tableau form to client console.
+ """
+ self._j_table_result.print()
diff --git a/flink-python/pyflink/table/tests/test_sql.py b/flink-python/pyflink/table/tests/test_sql.py
index 61de9b1..c8c0ee1 100644
--- a/flink-python/pyflink/table/tests/test_sql.py
+++ b/flink-python/pyflink/table/tests/test_sql.py
@@ -21,10 +21,8 @@ import subprocess
import unittest
from pyflink.find_flink_home import _find_flink_source_root
-
from pyflink.java_gateway import get_gateway
-
-from pyflink.table import DataTypes
+from pyflink.table import DataTypes, ResultKind
from pyflink.testing import source_sink_utils
from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, PyFlinkBatchTableTestCase
@@ -58,6 +56,49 @@ class StreamSqlTests(SqlTests, PyFlinkStreamTableTestCase):
expected = ['2,Hi,Hello', '3,Hello,Hello']
self.assert_equals(actual, expected)
+ def test_execute_sql(self):
+ t_env = self.t_env
+ table_result = t_env.execute_sql("create table tbl"
+ "("
+ " a bigint,"
+ " b int,"
+ " c varchar"
+ ") with ("
+ " 'connector' = 'COLLECTION',"
+ " 'is-bounded' = 'false'"
+ ")")
+ self.assertIsNone(table_result.get_job_client())
+ self.assert_equals(table_result.get_table_schema().get_field_names(), ["result"])
+ self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
+ table_result.print()
+
+ table_result = t_env.execute_sql("alter table tbl set ('k1' = 'a', 'k2' = 'b')")
+ self.assertIsNone(table_result.get_job_client())
+ self.assert_equals(table_result.get_table_schema().get_field_names(), ["result"])
+ self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
+ table_result.print()
+
+ field_names = ["k1", "k2", "c"]
+ field_types = [DataTypes.BIGINT(), DataTypes.INT(), DataTypes.STRING()]
+ t_env.register_table_sink(
+ "sinks",
+ source_sink_utils.TestAppendSink(field_names, field_types))
+ table_result = t_env.execute_sql("insert into sinks select * from tbl")
+ job_execution_result = table_result.get_job_client().get_job_execution_result(
+ get_gateway().jvm.Thread.currentThread().getContextClassLoader()).result()
+ self.assertIsNotNone(job_execution_result.get_job_id())
+ self.assertIsNotNone(job_execution_result.get_job_execution_result())
+ self.assert_equals(table_result.get_table_schema().get_field_names(),
+ ["default_catalog.default_database.sinks"])
+ self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS_WITH_CONTENT)
+ table_result.print()
+
+ table_result = t_env.execute_sql("drop table tbl")
+ self.assertIsNone(table_result.get_job_client())
+ self.assert_equals(table_result.get_table_schema().get_field_names(), ["result"])
+ self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
+ table_result.print()
+
def test_sql_update(self):
t_env = self.t_env
source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])