You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2019/07/10 09:54:11 UTC

[flink] branch master updated: [hotfix][python] Align with Java Table API to remove QueryConfig (#9063)

This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8acc1d3  [hotfix][python] Align with Java Table API to remove QueryConfig (#9063)
8acc1d3 is described below

commit 8acc1d39ba4d37392aba322ebae635d213fcdd72
Author: WeiZhong94 <44...@users.noreply.github.com>
AuthorDate: Wed Jul 10 17:53:54 2019 +0800

    [hotfix][python] Align with Java Table API to remove QueryConfig (#9063)
---
 docs/dev/table/streaming/query_configuration.md    |  14 +-
 docs/dev/table/streaming/query_configuration.zh.md |  14 +-
 flink-python/pyflink/table/__init__.py             |   3 -
 flink-python/pyflink/table/query_config.py         | 121 ----------------
 flink-python/pyflink/table/table_config.py         | 155 ++++++++++++++++++++-
 .../table/tests/test_table_config_completeness.py  |  58 ++++++++
 .../table/tests/test_table_environment_api.py      |  28 +++-
 7 files changed, 243 insertions(+), 150 deletions(-)

diff --git a/docs/dev/table/streaming/query_configuration.md b/docs/dev/table/streaming/query_configuration.md
index 1e0527d..dbb18bc 100644
--- a/docs/dev/table/streaming/query_configuration.md
+++ b/docs/dev/table/streaming/query_configuration.md
@@ -91,13 +91,13 @@ val stream: DataStream[Row] = result.toAppendStream[Row](qConfig)
 </div>
 <div data-lang="python" markdown="1">
 {% highlight python %}
-env = StreamExecutionEnvironment.get_execution_environment()
-table_env = StreamTableEnvironment.create(env)
-
-# obtain query configuration from TableEnvironment
-q_config = StreamQueryConfig()
+# use TableConfig instead of QueryConfig in python API
+t_config = TableConfig()
 # set query parameters
-q_config.with_idle_state_retention_time(timedelta(hours=12), timedelta(hours=24))
+t_config.set_idle_state_retention_time(timedelta(hours=12), timedelta(hours=24))
+
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env, t_config)
 
 # define query
 result = ...
@@ -110,7 +110,7 @@ table_env.register_table_sink("outputTable",  # table name
                               sink)  # table sink
 
 # emit result Table via a TableSink
-result.insert_into("outputTable", q_config)
+result.insert_into("outputTable")
 
 {% endhighlight %}
 </div>
diff --git a/docs/dev/table/streaming/query_configuration.zh.md b/docs/dev/table/streaming/query_configuration.zh.md
index 1e0527d..dbb18bc 100644
--- a/docs/dev/table/streaming/query_configuration.zh.md
+++ b/docs/dev/table/streaming/query_configuration.zh.md
@@ -91,13 +91,13 @@ val stream: DataStream[Row] = result.toAppendStream[Row](qConfig)
 </div>
 <div data-lang="python" markdown="1">
 {% highlight python %}
-env = StreamExecutionEnvironment.get_execution_environment()
-table_env = StreamTableEnvironment.create(env)
-
-# obtain query configuration from TableEnvironment
-q_config = StreamQueryConfig()
+# use TableConfig instead of QueryConfig in python API
+t_config = TableConfig()
 # set query parameters
-q_config.with_idle_state_retention_time(timedelta(hours=12), timedelta(hours=24))
+t_config.set_idle_state_retention_time(timedelta(hours=12), timedelta(hours=24))
+
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env, t_config)
 
 # define query
 result = ...
@@ -110,7 +110,7 @@ table_env.register_table_sink("outputTable",  # table name
                               sink)  # table sink
 
 # emit result Table via a TableSink
-result.insert_into("outputTable", q_config)
+result.insert_into("outputTable")
 
 {% endhighlight %}
 </div>
diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py
index ac5991d..48a150e 100644
--- a/flink-python/pyflink/table/__init__.py
+++ b/flink-python/pyflink/table/__init__.py
@@ -53,7 +53,6 @@ Important classes of Flink Table API:
 """
 from __future__ import absolute_import
 
-from pyflink.table.query_config import BatchQueryConfig, StreamQueryConfig
 from pyflink.table.table import Table, GroupedTable, GroupWindowedTable, OverWindowedTable, \
     WindowGroupedTable
 from pyflink.table.table_config import TableConfig
@@ -74,8 +73,6 @@ __all__ = [
     'OverWindowedTable',
     'WindowGroupedTable',
     'TableConfig',
-    'StreamQueryConfig',
-    'BatchQueryConfig',
     'TableSink',
     'TableSource',
     'WriteMode',
diff --git a/flink-python/pyflink/table/query_config.py b/flink-python/pyflink/table/query_config.py
deleted file mode 100644
index 69b6488..0000000
--- a/flink-python/pyflink/table/query_config.py
+++ /dev/null
@@ -1,121 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-from abc import ABCMeta
-from datetime import timedelta
-from py4j.compat import long
-
-from pyflink.java_gateway import get_gateway
-
-
-class QueryConfig(object):
-    """
-    The :class:`QueryConfig` holds parameters to configure the behavior of queries.
-    """
-
-    __metaclass__ = ABCMeta
-
-    def __init__(self, j_query_config):
-        self._j_query_config = j_query_config
-
-
-class StreamQueryConfig(QueryConfig):
-    """
-    The :class:`StreamQueryConfig` holds parameters to configure the behavior of streaming queries.
-
-    Example:
-    ::
-
-        >>> query_config = StreamQueryConfig() \\
-        ...     .with_idle_state_retention_time(datetime.timedelta(days=1),
-        ...                                     datetime.timedelta(days=3))
-        >>> table_env.sql_update("...", query_config)
-
-    """
-
-    def __init__(self, j_stream_query_config=None):
-        if j_stream_query_config is not None:
-            self._j_stream_query_config = j_stream_query_config
-        else:
-            self._j_stream_query_config = get_gateway().jvm.StreamQueryConfig()
-        super(StreamQueryConfig, self).__init__(self._j_stream_query_config)
-
-    def with_idle_state_retention_time(self, min_time, max_time):
-        """
-        Specifies a minimum and a maximum time interval for how long idle state, i.e., state which
-        was not updated, will be retained.
-
-        State will never be cleared until it was idle for less than the minimum time and will never
-        be kept if it was idle for more than the maximum time.
-
-        When new data arrives for previously cleaned-up state, the new data will be handled as if it
-        was the first data. This can result in previous results being overwritten.
-
-        Set to 0 (zero) to never clean-up the state.
-
-        .. note::
-
-            Cleaning up state requires additional bookkeeping which becomes less expensive for
-            larger differences of minTime and maxTime. The difference between minTime and maxTime
-            must be at least 5 minutes.
-
-        :param min_time: The minimum time interval for which idle state is retained. Set to
-                         0 (zero) to never clean-up the state.
-        :param max_time: The maximum time interval for which idle state is retained. Must be at
-                         least 5 minutes greater than minTime. Set to
-                         0 (zero) to never clean-up the state.
-        :return: :class:`StreamQueryConfig`
-        """
-        #  type: (timedelta, timedelta) -> StreamQueryConfig
-        j_time_class = get_gateway().jvm.org.apache.flink.api.common.time.Time
-        j_min_time = j_time_class.milliseconds(long(round(min_time.total_seconds() * 1000)))
-        j_max_time = j_time_class.milliseconds(long(round(max_time.total_seconds() * 1000)))
-        self._j_stream_query_config = \
-            self._j_stream_query_config.withIdleStateRetentionTime(j_min_time, j_max_time)
-        return self
-
-    def get_min_idle_state_retention_time(self):
-        """
-        State might be cleared and removed if it was not updated for the defined period of time.
-
-        :return: The minimum time until state which was not updated will be retained.
-        """
-        #  type: () -> int
-        return self._j_stream_query_config.getMinIdleStateRetentionTime()
-
-    def get_max_idle_state_retention_time(self):
-        """
-        State will be cleared and removed if it was not updated for the defined period of time.
-
-        :return: The maximum time until state which was not updated will be retained.
-        """
-        #  type: () -> int
-        return self._j_stream_query_config.getMaxIdleStateRetentionTime()
-
-
-class BatchQueryConfig(QueryConfig):
-    """
-    The :class:`BatchQueryConfig` holds parameters to configure the behavior of batch queries.
-    """
-
-    def __init__(self, j_batch_query_config=None):
-        self._jvm = get_gateway().jvm
-        if j_batch_query_config is not None:
-            self._j_batch_query_config = j_batch_query_config
-        else:
-            self._j_batch_query_config = self._jvm.BatchQueryConfig()
-        super(BatchQueryConfig, self).__init__(self._j_batch_query_config)
diff --git a/flink-python/pyflink/table/table_config.py b/flink-python/pyflink/table/table_config.py
index d6b5864..8e0fbec 100644
--- a/flink-python/pyflink/table/table_config.py
+++ b/flink-python/pyflink/table/table_config.py
@@ -17,6 +17,7 @@
 ################################################################################
 import sys
 
+from py4j.compat import long
 from pyflink.java_gateway import get_gateway
 
 __all__ = ['TableConfig']
@@ -30,11 +31,12 @@ class TableConfig(object):
     A config to define the runtime behavior of the Table API.
     """
 
-    def __init__(self):
-        self._jvm = get_gateway().jvm
-        self._j_table_config = self._jvm.TableConfig()
-        self._is_stream = None  # type: bool
-        self._parallelism = None  # type: int
+    def __init__(self, j_table_config=None):
+        gateway = get_gateway()
+        if j_table_config is None:
+            self._j_table_config = gateway.jvm.TableConfig()
+        else:
+            self._j_table_config = j_table_config
 
     def get_timezone(self):
         """
@@ -52,7 +54,7 @@ class TableConfig(object):
                             "GMT-8:00".
         """
         if timezone_id is not None and isinstance(timezone_id, (str, unicode)):
-            j_timezone = self._jvm.java.util.TimeZone.getTimeZone(timezone_id)
+            j_timezone = get_gateway().jvm.java.util.TimeZone.getTimeZone(timezone_id)
             self._j_table_config.setTimeZone(j_timezone)
         else:
             raise Exception("TableConfig.timezone should be a string!")
@@ -90,3 +92,144 @@ class TableConfig(object):
             self._j_table_config.setMaxGeneratedCodeLength(max_generated_code_length)
         else:
             raise Exception("TableConfig.max_generated_code_length should be a int value!")
+
+    def set_idle_state_retention_time(self, min_time, max_time):
+        """
+        Specifies a minimum and a maximum time interval for how long idle state, i.e., state which
+        was not updated, will be retained.
+
+        State will never be cleared until it was idle for less than the minimum time and will never
+        be kept if it was idle for more than the maximum time.
+
+        When new data arrives for previously cleaned-up state, the new data will be handled as if it
+        was the first data. This can result in previous results being overwritten.
+
+        Set to 0 (zero) to never clean-up the state.
+
+        Example:
+        ::
+
+            >>> table_config = TableConfig() \\
+            ...     .set_idle_state_retention_time(datetime.timedelta(days=1),
+            ...                                    datetime.timedelta(days=3))
+
+        .. note::
+
+            Cleaning up state requires additional bookkeeping which becomes less expensive for
+            larger differences of minTime and maxTime. The difference between minTime and maxTime
+            must be at least 5 minutes.
+
+        :param min_time: The minimum time interval for which idle state is retained. Set to
+                         0 (zero) to never clean-up the state.
+        :type min_time: datetime.timedelta
+        :param max_time: The maximum time interval for which idle state is retained. Must be at
+                         least 5 minutes greater than minTime. Set to
+                         0 (zero) to never clean-up the state.
+        :type max_time: datetime.timedelta
+        """
+        j_time_class = get_gateway().jvm.org.apache.flink.api.common.time.Time
+        j_min_time = j_time_class.milliseconds(long(round(min_time.total_seconds() * 1000)))
+        j_max_time = j_time_class.milliseconds(long(round(max_time.total_seconds() * 1000)))
+        self._j_table_config.setIdleStateRetentionTime(j_min_time, j_max_time)
+
+    def get_min_idle_state_retention_time(self):
+        """
+        State might be cleared and removed if it was not updated for the defined period of time.
+
+        :return: The minimum time until state which was not updated will be retained.
+        :rtype: int
+        """
+        return self._j_table_config.getMinIdleStateRetentionTime()
+
+    def get_max_idle_state_retention_time(self):
+        """
+        State will be cleared and removed if it was not updated for the defined period of time.
+
+        :return: The maximum time until state which was not updated will be retained.
+        :rtype: int
+        """
+        return self._j_table_config.getMaxIdleStateRetentionTime()
+
+    def set_decimal_context(self, precision, rounding_mode):
+        """
+        Sets the default context for decimal division calculation.
+        (precision=34, rounding_mode=HALF_EVEN) by default.
+
+        The precision is the number of digits to be used for an operation. A value of 0 indicates
+        that unlimited precision (as many digits as are required) will be used. Note that leading
+        zeros (in the coefficient of a number) are never significant.
+
+        The rounding mode is the rounding algorithm to be used for an operation. It could be:
+
+        **UP**, **DOWN**, **CEILING**, **FLOOR**, **HALF_UP**, **HALF_DOWN**, **HALF_EVEN**,
+        **UNNECESSARY**
+
+        The table below shows the results of rounding input to one digit with the given rounding
+        mode:
+
+        +-------+----+------+---------+-------+---------+-----------+-----------+-------------+
+        | Input | UP | DOWN | CEILING | FLOOR | HALF_UP | HALF_DOWN | HALF_EVEN | UNNECESSARY |
+        +=======+====+======+=========+=======+=========+===========+===========+=============+
+        | 5.5   |  6 |   5  |    6    |   5   |    6    |     5     |     6     |  Exception  |
+        +-------+----+------+---------+-------+---------+-----------+-----------+-------------+
+        | 2.5   |  3 |   2  |    3    |   2   |    3    |     2     |     2     |  Exception  |
+        +-------+----+------+---------+-------+---------+-----------+-----------+-------------+
+        | 1.6   |  2 |   1  |    2    |   1   |    2    |     2     |     2     |  Exception  |
+        +-------+----+------+---------+-------+---------+-----------+-----------+-------------+
+        | 1.1   |  2 |   1  |    2    |   1   |    1    |     1     |     1     |  Exception  |
+        +-------+----+------+---------+-------+---------+-----------+-----------+-------------+
+        | 1.0   |  1 |   1  |    1    |   1   |    1    |     1     |     1     |      1      |
+        +-------+----+------+---------+-------+---------+-----------+-----------+-------------+
+        | -1.0  | -1 |  -1  |   -1    |  -1   |   -1    |    -1     |    -1     |     -1      |
+        +-------+----+------+---------+-------+---------+-----------+-----------+-------------+
+        | -1.1  | -2 |  -1  |   -1    |  -2   |   -1    |    -1     |    -1     |  Exception  |
+        +-------+----+------+---------+-------+---------+-----------+-----------+-------------+
+        | -1.6  | -2 |  -1  |   -1    |  -2   |   -2    |    -2     |    -2     |  Exception  |
+        +-------+----+------+---------+-------+---------+-----------+-----------+-------------+
+        | 2.5   | -3 |  -2  |   -2    |  -3   |   -3    |    -2     |    -2     |  Exception  |
+        +-------+----+------+---------+-------+---------+-----------+-----------+-------------+
+        | 5.5   | -6 |  -5  |   -5    |  -6   |   -6    |    -5     |    -6     |  Exception  |
+        +-------+----+------+---------+-------+---------+-----------+-----------+-------------+
+
+        :param precision: The precision of the decimal context.
+        :type precision: int
+        :param rounding_mode: The rounding mode of the decimal context.
+        :type rounding_mode: str
+        """
+        if rounding_mode not in (
+                "UP",
+                "DOWN",
+                "CEILING",
+                "FLOOR",
+                "HALF_UP",
+                "HALF_DOWN",
+                "HALF_EVEN",
+                "UNNECESSARY"):
+            raise ValueError("Unsupported rounding_mode: %s" % rounding_mode)
+        gateway = get_gateway()
+        j_rounding_mode = getattr(gateway.jvm.java.math.RoundingMode, rounding_mode)
+        j_math_context = gateway.jvm.java.math.MathContext(precision, j_rounding_mode)
+        self._j_table_config.setDecimalContext(j_math_context)
+
+    def get_decimal_context(self):
+        """
+        Returns current context for decimal division calculation,
+        (precision=34, rounding_mode=HALF_EVEN) by default.
+
+        .. seealso:: :func:`set_decimal_context`
+
+        :return: the current context for decimal division calculation.
+        :rtype: (int, str)
+        """
+        j_math_context = self._j_table_config.getDecimalContext()
+        precision = j_math_context.getPrecision()
+        rounding_mode = j_math_context.getRoundingMode().name()
+        return precision, rounding_mode
+
+    @staticmethod
+    def get_default():
+        """
+        :return: A TableConfig object with default settings.
+        :rtype: TableConfig
+        """
+        return TableConfig(get_gateway().jvm.TableConfig.getDefault())
diff --git a/flink-python/pyflink/table/tests/test_table_config_completeness.py b/flink-python/pyflink/table/tests/test_table_config_completeness.py
new file mode 100644
index 0000000..9db60cf
--- /dev/null
+++ b/flink-python/pyflink/table/tests/test_table_config_completeness.py
@@ -0,0 +1,58 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import unittest
+
+from pyflink.testing.test_case_utils import PythonAPICompletenessTestCase
+from pyflink.table import TableConfig
+
+
+class TableConfigCompletenessTests(PythonAPICompletenessTestCase, unittest.TestCase):
+    """
+    Tests whether the Python :class:`TableConfig` is consistent with
+    Java `org.apache.flink.table.api.TableConfig`.
+    """
+
+    @classmethod
+    def python_class(cls):
+        return TableConfig
+
+    @classmethod
+    def java_class(cls):
+        return "org.apache.flink.table.api.TableConfig"
+
+    @classmethod
+    def excluded_methods(cls):
+        # internal interfaces, no need to expose to users.
+        return {'getPlannerConfig', 'setPlannerConfig'}
+
+    @classmethod
+    def java_method_name(cls, python_method_name):
+        # Most time zone related libraries in Python use 'timezone' instead of 'time_zone'.
+        return {'get_timezone': 'get_time_zone',
+                'set_timezone': 'set_time_zone'}.get(python_method_name, python_method_name)
+
+if __name__ == '__main__':
+    import unittest
+
+    try:
+        import xmlrunner
+        testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 3e43d5f..f85d6af 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -22,7 +22,6 @@ from py4j.compat import unicode
 
 from pyflink.dataset import ExecutionEnvironment
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.table import StreamQueryConfig
 from pyflink.table.table_environment import BatchTableEnvironment, StreamTableEnvironment
 from pyflink.table.table_config import TableConfig
 from pyflink.table.types import DataTypes, RowType
@@ -137,14 +136,31 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
         expected = ['1,Hi,Hello', '2,Hello,Hello']
         self.assert_equals(actual, expected)
 
-    def test_query_config(self):
-        query_config = StreamQueryConfig()
+    def test_table_config(self):
 
-        query_config.with_idle_state_retention_time(
+        table_config = TableConfig.get_default()
+        table_config.set_idle_state_retention_time(
             datetime.timedelta(days=1), datetime.timedelta(days=2))
 
-        self.assertEqual(2 * 24 * 3600 * 1000, query_config.get_max_idle_state_retention_time())
-        self.assertEqual(24 * 3600 * 1000, query_config.get_min_idle_state_retention_time())
+        self.assertEqual(2 * 24 * 3600 * 1000, table_config.get_max_idle_state_retention_time())
+        self.assertEqual(24 * 3600 * 1000, table_config.get_min_idle_state_retention_time())
+
+        table_config.set_decimal_context(20, "UNNECESSARY")
+        self.assertEqual((20, "UNNECESSARY"), table_config.get_decimal_context())
+        table_config.set_decimal_context(20, "HALF_EVEN")
+        self.assertEqual((20, "HALF_EVEN"), table_config.get_decimal_context())
+        table_config.set_decimal_context(20, "HALF_DOWN")
+        self.assertEqual((20, "HALF_DOWN"), table_config.get_decimal_context())
+        table_config.set_decimal_context(20, "HALF_UP")
+        self.assertEqual((20, "HALF_UP"), table_config.get_decimal_context())
+        table_config.set_decimal_context(20, "FLOOR")
+        self.assertEqual((20, "FLOOR"), table_config.get_decimal_context())
+        table_config.set_decimal_context(20, "CEILING")
+        self.assertEqual((20, "CEILING"), table_config.get_decimal_context())
+        table_config.set_decimal_context(20, "DOWN")
+        self.assertEqual((20, "DOWN"), table_config.get_decimal_context())
+        table_config.set_decimal_context(20, "UP")
+        self.assertEqual((20, "UP"), table_config.get_decimal_context())
 
     def test_create_table_environment(self):
         table_config = TableConfig()