You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2019/07/10 09:54:11 UTC
[flink] branch master updated: [hotfix][python] Align with Java
Table API to remove QueryConfig (#9063)
This is an automated email from the ASF dual-hosted git repository.
jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8acc1d3 [hotfix][python] Align with Java Table API to remove QueryConfig (#9063)
8acc1d3 is described below
commit 8acc1d39ba4d37392aba322ebae635d213fcdd72
Author: WeiZhong94 <44...@users.noreply.github.com>
AuthorDate: Wed Jul 10 17:53:54 2019 +0800
[hotfix][python] Align with Java Table API to remove QueryConfig (#9063)
---
docs/dev/table/streaming/query_configuration.md | 14 +-
docs/dev/table/streaming/query_configuration.zh.md | 14 +-
flink-python/pyflink/table/__init__.py | 3 -
flink-python/pyflink/table/query_config.py | 121 ----------------
flink-python/pyflink/table/table_config.py | 155 ++++++++++++++++++++-
.../table/tests/test_table_config_completeness.py | 58 ++++++++
.../table/tests/test_table_environment_api.py | 28 +++-
7 files changed, 243 insertions(+), 150 deletions(-)
diff --git a/docs/dev/table/streaming/query_configuration.md b/docs/dev/table/streaming/query_configuration.md
index 1e0527d..dbb18bc 100644
--- a/docs/dev/table/streaming/query_configuration.md
+++ b/docs/dev/table/streaming/query_configuration.md
@@ -91,13 +91,13 @@ val stream: DataStream[Row] = result.toAppendStream[Row](qConfig)
</div>
<div data-lang="python" markdown="1">
{% highlight python %}
-env = StreamExecutionEnvironment.get_execution_environment()
-table_env = StreamTableEnvironment.create(env)
-
-# obtain query configuration from TableEnvironment
-q_config = StreamQueryConfig()
+# use TableConfig instead of QueryConfig in python API
+t_config = TableConfig()
# set query parameters
-q_config.with_idle_state_retention_time(timedelta(hours=12), timedelta(hours=24))
+t_config.set_idle_state_retention_time(timedelta(hours=12), timedelta(hours=24))
+
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env, t_config)
# define query
result = ...
@@ -110,7 +110,7 @@ table_env.register_table_sink("outputTable", # table name
sink) # table sink
# emit result Table via a TableSink
-result.insert_into("outputTable", q_config)
+result.insert_into("outputTable")
{% endhighlight %}
</div>
diff --git a/docs/dev/table/streaming/query_configuration.zh.md b/docs/dev/table/streaming/query_configuration.zh.md
index 1e0527d..dbb18bc 100644
--- a/docs/dev/table/streaming/query_configuration.zh.md
+++ b/docs/dev/table/streaming/query_configuration.zh.md
@@ -91,13 +91,13 @@ val stream: DataStream[Row] = result.toAppendStream[Row](qConfig)
</div>
<div data-lang="python" markdown="1">
{% highlight python %}
-env = StreamExecutionEnvironment.get_execution_environment()
-table_env = StreamTableEnvironment.create(env)
-
-# obtain query configuration from TableEnvironment
-q_config = StreamQueryConfig()
+# use TableConfig instead of QueryConfig in python API
+t_config = TableConfig()
# set query parameters
-q_config.with_idle_state_retention_time(timedelta(hours=12), timedelta(hours=24))
+t_config.set_idle_state_retention_time(timedelta(hours=12), timedelta(hours=24))
+
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env, t_config)
# define query
result = ...
@@ -110,7 +110,7 @@ table_env.register_table_sink("outputTable", # table name
sink) # table sink
# emit result Table via a TableSink
-result.insert_into("outputTable", q_config)
+result.insert_into("outputTable")
{% endhighlight %}
</div>
diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py
index ac5991d..48a150e 100644
--- a/flink-python/pyflink/table/__init__.py
+++ b/flink-python/pyflink/table/__init__.py
@@ -53,7 +53,6 @@ Important classes of Flink Table API:
"""
from __future__ import absolute_import
-from pyflink.table.query_config import BatchQueryConfig, StreamQueryConfig
from pyflink.table.table import Table, GroupedTable, GroupWindowedTable, OverWindowedTable, \
WindowGroupedTable
from pyflink.table.table_config import TableConfig
@@ -74,8 +73,6 @@ __all__ = [
'OverWindowedTable',
'WindowGroupedTable',
'TableConfig',
- 'StreamQueryConfig',
- 'BatchQueryConfig',
'TableSink',
'TableSource',
'WriteMode',
diff --git a/flink-python/pyflink/table/query_config.py b/flink-python/pyflink/table/query_config.py
deleted file mode 100644
index 69b6488..0000000
--- a/flink-python/pyflink/table/query_config.py
+++ /dev/null
@@ -1,121 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-from abc import ABCMeta
-from datetime import timedelta
-from py4j.compat import long
-
-from pyflink.java_gateway import get_gateway
-
-
-class QueryConfig(object):
- """
- The :class:`QueryConfig` holds parameters to configure the behavior of queries.
- """
-
- __metaclass__ = ABCMeta
-
- def __init__(self, j_query_config):
- self._j_query_config = j_query_config
-
-
-class StreamQueryConfig(QueryConfig):
- """
- The :class:`StreamQueryConfig` holds parameters to configure the behavior of streaming queries.
-
- Example:
- ::
-
- >>> query_config = StreamQueryConfig() \\
- ... .with_idle_state_retention_time(datetime.timedelta(days=1),
- ... datetime.timedelta(days=3))
- >>> table_env.sql_update("...", query_config)
-
- """
-
- def __init__(self, j_stream_query_config=None):
- if j_stream_query_config is not None:
- self._j_stream_query_config = j_stream_query_config
- else:
- self._j_stream_query_config = get_gateway().jvm.StreamQueryConfig()
- super(StreamQueryConfig, self).__init__(self._j_stream_query_config)
-
- def with_idle_state_retention_time(self, min_time, max_time):
- """
- Specifies a minimum and a maximum time interval for how long idle state, i.e., state which
- was not updated, will be retained.
-
- State will never be cleared until it was idle for less than the minimum time and will never
- be kept if it was idle for more than the maximum time.
-
- When new data arrives for previously cleaned-up state, the new data will be handled as if it
- was the first data. This can result in previous results being overwritten.
-
- Set to 0 (zero) to never clean-up the state.
-
- .. note::
-
- Cleaning up state requires additional bookkeeping which becomes less expensive for
- larger differences of minTime and maxTime. The difference between minTime and maxTime
- must be at least 5 minutes.
-
- :param min_time: The minimum time interval for which idle state is retained. Set to
- 0 (zero) to never clean-up the state.
- :param max_time: The maximum time interval for which idle state is retained. Must be at
- least 5 minutes greater than minTime. Set to
- 0 (zero) to never clean-up the state.
- :return: :class:`StreamQueryConfig`
- """
- # type: (timedelta, timedelta) -> StreamQueryConfig
- j_time_class = get_gateway().jvm.org.apache.flink.api.common.time.Time
- j_min_time = j_time_class.milliseconds(long(round(min_time.total_seconds() * 1000)))
- j_max_time = j_time_class.milliseconds(long(round(max_time.total_seconds() * 1000)))
- self._j_stream_query_config = \
- self._j_stream_query_config.withIdleStateRetentionTime(j_min_time, j_max_time)
- return self
-
- def get_min_idle_state_retention_time(self):
- """
- State might be cleared and removed if it was not updated for the defined period of time.
-
- :return: The minimum time until state which was not updated will be retained.
- """
- # type: () -> int
- return self._j_stream_query_config.getMinIdleStateRetentionTime()
-
- def get_max_idle_state_retention_time(self):
- """
- State will be cleared and removed if it was not updated for the defined period of time.
-
- :return: The maximum time until state which was not updated will be retained.
- """
- # type: () -> int
- return self._j_stream_query_config.getMaxIdleStateRetentionTime()
-
-
-class BatchQueryConfig(QueryConfig):
- """
- The :class:`BatchQueryConfig` holds parameters to configure the behavior of batch queries.
- """
-
- def __init__(self, j_batch_query_config=None):
- self._jvm = get_gateway().jvm
- if j_batch_query_config is not None:
- self._j_batch_query_config = j_batch_query_config
- else:
- self._j_batch_query_config = self._jvm.BatchQueryConfig()
- super(BatchQueryConfig, self).__init__(self._j_batch_query_config)
diff --git a/flink-python/pyflink/table/table_config.py b/flink-python/pyflink/table/table_config.py
index d6b5864..8e0fbec 100644
--- a/flink-python/pyflink/table/table_config.py
+++ b/flink-python/pyflink/table/table_config.py
@@ -17,6 +17,7 @@
################################################################################
import sys
+from py4j.compat import long
from pyflink.java_gateway import get_gateway
__all__ = ['TableConfig']
@@ -30,11 +31,12 @@ class TableConfig(object):
A config to define the runtime behavior of the Table API.
"""
- def __init__(self):
- self._jvm = get_gateway().jvm
- self._j_table_config = self._jvm.TableConfig()
- self._is_stream = None # type: bool
- self._parallelism = None # type: int
+ def __init__(self, j_table_config=None):
+ gateway = get_gateway()
+ if j_table_config is None:
+ self._j_table_config = gateway.jvm.TableConfig()
+ else:
+ self._j_table_config = j_table_config
def get_timezone(self):
"""
@@ -52,7 +54,7 @@ class TableConfig(object):
"GMT-8:00".
"""
if timezone_id is not None and isinstance(timezone_id, (str, unicode)):
- j_timezone = self._jvm.java.util.TimeZone.getTimeZone(timezone_id)
+ j_timezone = get_gateway().jvm.java.util.TimeZone.getTimeZone(timezone_id)
self._j_table_config.setTimeZone(j_timezone)
else:
raise Exception("TableConfig.timezone should be a string!")
@@ -90,3 +92,144 @@ class TableConfig(object):
self._j_table_config.setMaxGeneratedCodeLength(max_generated_code_length)
else:
raise Exception("TableConfig.max_generated_code_length should be a int value!")
+
+ def set_idle_state_retention_time(self, min_time, max_time):
+ """
+ Specifies a minimum and a maximum time interval for how long idle state, i.e., state which
+ was not updated, will be retained.
+
+ State will never be cleared until it was idle for less than the minimum time and will never
+ be kept if it was idle for more than the maximum time.
+
+ When new data arrives for previously cleaned-up state, the new data will be handled as if it
+ was the first data. This can result in previous results being overwritten.
+
+ Set to 0 (zero) to never clean-up the state.
+
+ Example:
+ ::
+
+ >>> table_config = TableConfig() \\
+ ... .set_idle_state_retention_time(datetime.timedelta(days=1),
+ ... datetime.timedelta(days=3))
+
+ .. note::
+
+ Cleaning up state requires additional bookkeeping which becomes less expensive for
+ larger differences of minTime and maxTime. The difference between minTime and maxTime
+ must be at least 5 minutes.
+
+ :param min_time: The minimum time interval for which idle state is retained. Set to
+ 0 (zero) to never clean-up the state.
+ :type min_time: datetime.timedelta
+ :param max_time: The maximum time interval for which idle state is retained. Must be at
+ least 5 minutes greater than minTime. Set to
+ 0 (zero) to never clean-up the state.
+ :type max_time: datetime.timedelta
+ """
+ j_time_class = get_gateway().jvm.org.apache.flink.api.common.time.Time
+ j_min_time = j_time_class.milliseconds(long(round(min_time.total_seconds() * 1000)))
+ j_max_time = j_time_class.milliseconds(long(round(max_time.total_seconds() * 1000)))
+ self._j_table_config.setIdleStateRetentionTime(j_min_time, j_max_time)
+
+ def get_min_idle_state_retention_time(self):
+ """
+ State might be cleared and removed if it was not updated for the defined period of time.
+
+ :return: The minimum time until state which was not updated will be retained.
+ :rtype: int
+ """
+ return self._j_table_config.getMinIdleStateRetentionTime()
+
+ def get_max_idle_state_retention_time(self):
+ """
+ State will be cleared and removed if it was not updated for the defined period of time.
+
+ :return: The maximum time until state which was not updated will be retained.
+ :rtype: int
+ """
+ return self._j_table_config.getMaxIdleStateRetentionTime()
+
+ def set_decimal_context(self, precision, rounding_mode):
+ """
+ Sets the default context for decimal division calculation.
+ (precision=34, rounding_mode=HALF_EVEN) by default.
+
+ The precision is the number of digits to be used for an operation. A value of 0 indicates
+ that unlimited precision (as many digits as are required) will be used. Note that leading
+ zeros (in the coefficient of a number) are never significant.
+
+ The rounding mode is the rounding algorithm to be used for an operation. It could be:
+
+ **UP**, **DOWN**, **CEILING**, **FLOOR**, **HALF_UP**, **HALF_DOWN**, **HALF_EVEN**,
+ **UNNECESSARY**
+
+ The table below shows the results of rounding input to one digit with the given rounding
+ mode:
+
+ +-------+----+------+---------+-------+---------+-----------+-----------+-------------+
+ | Input | UP | DOWN | CEILING | FLOOR | HALF_UP | HALF_DOWN | HALF_EVEN | UNNECESSARY |
+ +=======+====+======+=========+=======+=========+===========+===========+=============+
+ | 5.5 | 6 | 5 | 6 | 5 | 6 | 5 | 6 | Exception |
+ +-------+----+------+---------+-------+---------+-----------+-----------+-------------+
+ | 2.5 | 3 | 2 | 3 | 2 | 3 | 2 | 2 | Exception |
+ +-------+----+------+---------+-------+---------+-----------+-----------+-------------+
+ | 1.6 | 2 | 1 | 2 | 1 | 2 | 2 | 2 | Exception |
+ +-------+----+------+---------+-------+---------+-----------+-----------+-------------+
+ | 1.1 | 2 | 1 | 2 | 1 | 1 | 1 | 1 | Exception |
+ +-------+----+------+---------+-------+---------+-----------+-----------+-------------+
+ | 1.0 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 |
+ +-------+----+------+---------+-------+---------+-----------+-----------+-------------+
+ | -1.0 | -1 | -1 | -1 | -1 | -1 | -1 | -1 | -1 |
+ +-------+----+------+---------+-------+---------+-----------+-----------+-------------+
+ | -1.1 | -2 | -1 | -1 | -2 | -1 | -1 | -1 | Exception |
+ +-------+----+------+---------+-------+---------+-----------+-----------+-------------+
+ | -1.6 | -2 | -1 | -1 | -2 | -2 | -2 | -2 | Exception |
+ +-------+----+------+---------+-------+---------+-----------+-----------+-------------+
+ | 2.5 | -3 | -2 | -2 | -3 | -3 | -2 | -2 | Exception |
+ +-------+----+------+---------+-------+---------+-----------+-----------+-------------+
+ | 5.5 | -6 | -5 | -5 | -6 | -6 | -5 | -6 | Exception |
+ +-------+----+------+---------+-------+---------+-----------+-----------+-------------+
+
+ :param precision: The precision of the decimal context.
+ :type precision: int
+ :param rounding_mode: The rounding mode of the decimal context.
+ :type rounding_mode: str
+ """
+ if rounding_mode not in (
+ "UP",
+ "DOWN",
+ "CEILING",
+ "FLOOR",
+ "HALF_UP",
+ "HALF_DOWN",
+ "HALF_EVEN",
+ "UNNECESSARY"):
+ raise ValueError("Unsupported rounding_mode: %s" % rounding_mode)
+ gateway = get_gateway()
+ j_rounding_mode = getattr(gateway.jvm.java.math.RoundingMode, rounding_mode)
+ j_math_context = gateway.jvm.java.math.MathContext(precision, j_rounding_mode)
+ self._j_table_config.setDecimalContext(j_math_context)
+
+ def get_decimal_context(self):
+ """
+ Returns current context for decimal division calculation,
+ (precision=34, rounding_mode=HALF_EVEN) by default.
+
+ .. seealso:: :func:`set_decimal_context`
+
+ :return: the current context for decimal division calculation.
+ :rtype: (int, str)
+ """
+ j_math_context = self._j_table_config.getDecimalContext()
+ precision = j_math_context.getPrecision()
+ rounding_mode = j_math_context.getRoundingMode().name()
+ return precision, rounding_mode
+
+ @staticmethod
+ def get_default():
+ """
+ :return: A TableConfig object with default settings.
+ :rtype: TableConfig
+ """
+ return TableConfig(get_gateway().jvm.TableConfig.getDefault())
diff --git a/flink-python/pyflink/table/tests/test_table_config_completeness.py b/flink-python/pyflink/table/tests/test_table_config_completeness.py
new file mode 100644
index 0000000..9db60cf
--- /dev/null
+++ b/flink-python/pyflink/table/tests/test_table_config_completeness.py
@@ -0,0 +1,58 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import unittest
+
+from pyflink.testing.test_case_utils import PythonAPICompletenessTestCase
+from pyflink.table import TableConfig
+
+
+class TableConfigCompletenessTests(PythonAPICompletenessTestCase, unittest.TestCase):
+ """
+ Tests whether the Python :class:`TableConfig` is consistent with
+ Java `org.apache.flink.table.api.TableConfig`.
+ """
+
+ @classmethod
+ def python_class(cls):
+ return TableConfig
+
+ @classmethod
+ def java_class(cls):
+ return "org.apache.flink.table.api.TableConfig"
+
+ @classmethod
+ def excluded_methods(cls):
+ # internal interfaces, no need to expose to users.
+ return {'getPlannerConfig', 'setPlannerConfig'}
+
+ @classmethod
+ def java_method_name(cls, python_method_name):
+ # Most time zone related libraries in Python use 'timezone' instead of 'time_zone'.
+ return {'get_timezone': 'get_time_zone',
+ 'set_timezone': 'set_time_zone'}.get(python_method_name, python_method_name)
+
+if __name__ == '__main__':
+ import unittest
+
+ try:
+ import xmlrunner
+ testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
+ except ImportError:
+ testRunner = None
+ unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 3e43d5f..f85d6af 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -22,7 +22,6 @@ from py4j.compat import unicode
from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.table import StreamQueryConfig
from pyflink.table.table_environment import BatchTableEnvironment, StreamTableEnvironment
from pyflink.table.table_config import TableConfig
from pyflink.table.types import DataTypes, RowType
@@ -137,14 +136,31 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
expected = ['1,Hi,Hello', '2,Hello,Hello']
self.assert_equals(actual, expected)
- def test_query_config(self):
- query_config = StreamQueryConfig()
+ def test_table_config(self):
- query_config.with_idle_state_retention_time(
+ table_config = TableConfig.get_default()
+ table_config.set_idle_state_retention_time(
datetime.timedelta(days=1), datetime.timedelta(days=2))
- self.assertEqual(2 * 24 * 3600 * 1000, query_config.get_max_idle_state_retention_time())
- self.assertEqual(24 * 3600 * 1000, query_config.get_min_idle_state_retention_time())
+ self.assertEqual(2 * 24 * 3600 * 1000, table_config.get_max_idle_state_retention_time())
+ self.assertEqual(24 * 3600 * 1000, table_config.get_min_idle_state_retention_time())
+
+ table_config.set_decimal_context(20, "UNNECESSARY")
+ self.assertEqual((20, "UNNECESSARY"), table_config.get_decimal_context())
+ table_config.set_decimal_context(20, "HALF_EVEN")
+ self.assertEqual((20, "HALF_EVEN"), table_config.get_decimal_context())
+ table_config.set_decimal_context(20, "HALF_DOWN")
+ self.assertEqual((20, "HALF_DOWN"), table_config.get_decimal_context())
+ table_config.set_decimal_context(20, "HALF_UP")
+ self.assertEqual((20, "HALF_UP"), table_config.get_decimal_context())
+ table_config.set_decimal_context(20, "FLOOR")
+ self.assertEqual((20, "FLOOR"), table_config.get_decimal_context())
+ table_config.set_decimal_context(20, "CEILING")
+ self.assertEqual((20, "CEILING"), table_config.get_decimal_context())
+ table_config.set_decimal_context(20, "DOWN")
+ self.assertEqual((20, "DOWN"), table_config.get_decimal_context())
+ table_config.set_decimal_context(20, "UP")
+ self.assertEqual((20, "UP"), table_config.get_decimal_context())
def test_create_table_environment(self):
table_config = TableConfig()