You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2020/09/08 02:06:28 UTC
[flink] branch master updated: [FLINK-19131][python] Add support of
Python 3.8 in PyFlink
This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c1a12e9 [FLINK-19131][python] Add support of Python 3.8 in PyFlink
c1a12e9 is described below
commit c1a12e925b6ef46ad5cf0e0a5723949572550e9b
Author: sunjincheng121 <su...@gmail.com>
AuthorDate: Fri Sep 4 17:32:37 2020 +0800
[FLINK-19131][python] Add support of Python 3.8 in PyFlink
This closes #13334.
---
docs/_includes/generated/python_configuration.html | 2 +-
docs/dev/python/installation.md | 4 +-
docs/dev/python/installation.zh.md | 4 +-
.../table-api-users-guide/udfs/python_udfs.md | 2 +-
.../table-api-users-guide/udfs/python_udfs.zh.md | 2 +-
.../udfs/vectorized_python_udfs.md | 2 +-
.../udfs/vectorized_python_udfs.zh.md | 2 +-
docs/dev/table/sqlClient.md | 2 +-
docs/dev/table/sqlClient.zh.md | 2 +-
docs/flinkDev/building.md | 4 +-
docs/flinkDev/building.zh.md | 4 +-
docs/ops/cli.md | 8 +-
docs/ops/cli.zh.md | 8 +-
.../apache/flink/client/cli/CliFrontendParser.java | 2 +-
flink-python/README.md | 2 +-
flink-python/dev/dev-requirements.txt | 2 +-
flink-python/dev/lint-python.sh | 6 +-
.../datastream/stream_execution_environment.py | 2 +-
.../pyflink/fn_execution/beam/beam_boot.py | 10 +-
.../fn_execution/beam/beam_operations_fast.pyx | 7 +-
.../fn_execution/beam/beam_operations_slow.py | 7 +-
.../fn_execution/tests/test_process_mode_boot.py | 11 -
flink-python/pyflink/table/table_config.py | 2 +-
flink-python/setup.py | 9 +-
.../fnexecution/state/GrpcStateService.java | 4 +-
.../grpc/v1p21p0/io/netty/buffer/PoolArena.java | 818 ---------------------
.../v1p21p0/io/netty/buffer/PoolThreadCache.java | 508 -------------
.../io/netty/buffer/PooledByteBufAllocator.java | 640 ----------------
...eamDataStreamPythonStatelessFunctionRunner.java | 2 +-
.../org/apache/flink/python/PythonOptions.java | 2 +-
.../flink/python/metric/FlinkMetricContainer.java | 11 +-
.../python/beam/BeamPythonFunctionRunner.java | 5 +-
.../beam/BeamPythonStatelessFunctionRunner.java | 26 +-
.../BeamTablePythonStatelessFunctionRunner.java | 2 +-
flink-python/src/main/resources/META-INF/NOTICE | 78 +-
.../python/metric/FlinkMetricContainerTest.java | 53 +-
.../PassThroughPythonScalarFunctionRunner.java | 2 +-
.../PassThroughPythonTableFunctionRunner.java | 2 +-
flink-python/tox.ini | 4 +-
pom.xml | 4 +-
40 files changed, 147 insertions(+), 2120 deletions(-)
diff --git a/docs/_includes/generated/python_configuration.html b/docs/_includes/generated/python_configuration.html
index 890d025..00406f1 100644
--- a/docs/_includes/generated/python_configuration.html
+++ b/docs/_includes/generated/python_configuration.html
@@ -24,7 +24,7 @@
<td><h5>python.executable</h5></td>
<td style="word-wrap: break-word;">"python"</td>
<td>String</td>
- <td>Specify the path of the python interpreter used to execute the python UDF worker. The python UDF worker depends on Python 3.5+, Apache Beam (version == 2.19.0), Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). Please ensure that the specified environment meets the above requirements. The option is equivalent to the command line option "-pyexec".</td>
+ <td>Specify the path of the python interpreter used to execute the python UDF worker. The python UDF worker depends on Python 3.5+, Apache Beam (version == 2.23.0), Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). Please ensure that the specified environment meets the above requirements. The option is equivalent to the command line option "-pyexec".</td>
</tr>
<tr>
<td><h5>python.files</h5></td>
diff --git a/docs/dev/python/installation.md b/docs/dev/python/installation.md
index 82c3737..7bbcd74 100644
--- a/docs/dev/python/installation.md
+++ b/docs/dev/python/installation.md
@@ -26,11 +26,11 @@ under the License.
{:toc}
## Environment Requirements
-<span class="label label-info">Note</span> Python version (3.5, 3.6 or 3.7) is required for PyFlink. Please run the following command to make sure that it meets the requirements:
+<span class="label label-info">Note</span> Python version (3.5, 3.6, 3.7 or 3.8) is required for PyFlink. Please run the following command to make sure that it meets the requirements:
{% highlight bash %}
$ python --version
-# the version printed here must be 3.5, 3.6 or 3.7
+# the version printed here must be 3.5, 3.6, 3.7 or 3.8
{% endhighlight %}
## Installation of PyFlink
diff --git a/docs/dev/python/installation.zh.md b/docs/dev/python/installation.zh.md
index 3df0dc9..8a0209a 100644
--- a/docs/dev/python/installation.zh.md
+++ b/docs/dev/python/installation.zh.md
@@ -26,11 +26,11 @@ under the License.
{:toc}
## 环境要求
-<span class="label label-info">注意</span> PyFlink需要特定的Python版本(3.5, 3.6 或 3.7)。请运行以下命令,以确保Python版本满足要求。
+<span class="label label-info">注意</span> PyFlink需要特定的Python版本(3.5, 3.6, 3.7 或 3.8)。请运行以下命令,以确保Python版本满足要求。
{% highlight bash %}
$ python --version
-# the version printed here must be 3.5, 3.6 or 3.7
+# the version printed here must be 3.5, 3.6, 3.7 or 3.8
{% endhighlight %}
## PyFlink 安装
diff --git a/docs/dev/python/table-api-users-guide/udfs/python_udfs.md b/docs/dev/python/table-api-users-guide/udfs/python_udfs.md
index 84f661e..901ba5a 100644
--- a/docs/dev/python/table-api-users-guide/udfs/python_udfs.md
+++ b/docs/dev/python/table-api-users-guide/udfs/python_udfs.md
@@ -25,7 +25,7 @@ under the License.
User-defined functions are important features, because they significantly extend the expressiveness of Python Table API programs.
-**NOTE:** Python UDF execution requires Python version (3.5, 3.6 or 3.7) with PyFlink installed. It's required on both the client side and the cluster side.
+**NOTE:** Python UDF execution requires Python version (3.5, 3.6, 3.7 or 3.8) with PyFlink installed. It's required on both the client side and the cluster side.
* This will be replaced by the TOC
{:toc}
diff --git a/docs/dev/python/table-api-users-guide/udfs/python_udfs.zh.md b/docs/dev/python/table-api-users-guide/udfs/python_udfs.zh.md
index 94ad4f1..eaf38c9 100644
--- a/docs/dev/python/table-api-users-guide/udfs/python_udfs.zh.md
+++ b/docs/dev/python/table-api-users-guide/udfs/python_udfs.zh.md
@@ -25,7 +25,7 @@ under the License.
用户自定义函数是重要的功能,因为它们极大地扩展了Python Table API程序的表达能力。
-**注意:** 要执行Python用户自定义函数,客户端和集群端都需要安装Python版本3.5、3.6或3.7,并安装PyFlink。
+**注意:** 要执行Python用户自定义函数,客户端和集群端都需要安装Python版本(3.5、3.6、3.7 或 3.8),并安装PyFlink。
* This will be replaced by the TOC
{:toc}
diff --git a/docs/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md b/docs/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md
index 8337f69..18c60d0 100644
--- a/docs/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md
+++ b/docs/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md
@@ -30,7 +30,7 @@ These Python libraries are highly optimized and provide high-performance data st
[non-vectorized user-defined functions]({% link dev/python/table-api-users-guide/udfs/python_udfs.md %}) on how to define vectorized user-defined functions.
Users only need to add an extra parameter `udf_type="pandas"` in the decorator `udf` to mark it as a vectorized user-defined function.
-**NOTE:** Python UDF execution requires Python version (3.5, 3.6 or 3.7) with PyFlink installed. It's required on both the client side and the cluster side.
+**NOTE:** Python UDF execution requires Python version (3.5, 3.6, 3.7 or 3.8) with PyFlink installed. It's required on both the client side and the cluster side.
* This will be replaced by the TOC
{:toc}
diff --git a/docs/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.zh.md b/docs/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.zh.md
index 375d084..a03474f 100644
--- a/docs/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.zh.md
+++ b/docs/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.zh.md
@@ -29,7 +29,7 @@ under the License.
向量化用户自定义函数的定义,与[非向量化用户自定义函数]({% link dev/python/table-api-users-guide/udfs/python_udfs.zh.md %})具有相似的方式,
用户只需要在调用`udf`装饰器时添加一个额外的参数`udf_type="pandas"`,将其标记为一个向量化用户自定义函数即可。
-**注意:**要执行Python UDF,需要安装PyFlink的Python版本(3.5、3.6或3.7)。客户端和群集端都需要安装它。
+**注意:**要执行Python UDF,需要安装PyFlink的Python版本(3.5、3.6、3.7 或 3.8)。客户端和群集端都需要安装它。
* This will be replaced by the TOC
{:toc}
diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index 4acecbe..1818728 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -222,7 +222,7 @@ Mode "embedded" submits Flink jobs from the local machine.
--pyExecutable
/usr/local/bin/python3). The python
UDF worker depends on Python 3.5+,
- Apache Beam (version == 2.19.0), Pip
+ Apache Beam (version == 2.23.0), Pip
(version >= 7.1.0) and SetupTools
(version >= 37.0.0). Please ensure
that the specified environment meets
diff --git a/docs/dev/table/sqlClient.zh.md b/docs/dev/table/sqlClient.zh.md
index 69df6f0..a25ce3a 100644
--- a/docs/dev/table/sqlClient.zh.md
+++ b/docs/dev/table/sqlClient.zh.md
@@ -223,7 +223,7 @@ Mode "embedded" submits Flink jobs from the local machine.
--pyExecutable
/usr/local/bin/python3). The python
UDF worker depends on Python 3.5+,
- Apache Beam (version == 2.19.0), Pip
+ Apache Beam (version == 2.23.0), Pip
(version >= 7.1.0) and SetupTools
(version >= 37.0.0). Please ensure
that the specified environment meets
diff --git a/docs/flinkDev/building.md b/docs/flinkDev/building.md
index 017e5cd..421ff07 100644
--- a/docs/flinkDev/building.md
+++ b/docs/flinkDev/building.md
@@ -64,11 +64,11 @@ mvn clean install -DskipTests -Dfast
If you want to build a PyFlink package that can be used for pip installation, you need to build the Flink project first, as described in [Build Flink](#build-flink).
-2. Python version(3.5, 3.6 or 3.7) is required
+2. Python version(3.5, 3.6, 3.7 or 3.8) is required
```shell
$ python --version
- # the version printed here must be 3.5, 3.6 or 3.7
+ # the version printed here must be 3.5, 3.6, 3.7 or 3.8
```
3. Build PyFlink with Cython extension support (optional)
diff --git a/docs/flinkDev/building.zh.md b/docs/flinkDev/building.zh.md
index 22ae108..9511b93 100644
--- a/docs/flinkDev/building.zh.md
+++ b/docs/flinkDev/building.zh.md
@@ -65,11 +65,11 @@ mvn clean install -DskipTests -Dfast
如果想构建一个可用于 pip 安装的 PyFlink 包,需要先构建 Flink 工程,如 [构建 Flink](#build-flink) 中所述。
-2. Python 的版本为 3.5, 3.6 或者 3.7.
+2. Python 的版本为 3.5, 3.6, 3.7 或者 3.8.
```shell
$ python --version
- # the version printed here must be 3.5, 3.6 or 3.7
+ # the version printed here must be 3.5, 3.6, 3.7 or 3.8
```
3. 构建 PyFlink 的 Cython 扩展模块(可选的)
diff --git a/docs/ops/cli.md b/docs/ops/cli.md
index aad9a78..11e7151 100644
--- a/docs/ops/cli.md
+++ b/docs/ops/cli.md
@@ -118,11 +118,11 @@ These examples about how to submit a job in CLI.
<div data-lang="python" markdown="1">
-<span class="label label-info">Note</span> When submitting Python job via `flink run`, Flink will run the command “python”. Please run the following command to confirm that the command “python” in current environment points to a specified Python version 3.5, 3.6 or 3.7:
+<span class="label label-info">Note</span> When submitting Python job via `flink run`, Flink will run the command “python”. Please run the following command to confirm that the command “python” in current environment points to a specified Python version 3.5, 3.6, 3.7 or 3.8:
{% highlight bash %}
$ python --version
-# the version printed here must be 3.5, 3.6 or 3.7
+# the version printed here must be 3.5, 3.6, 3.7 or 3.8
{% endhighlight %}
- Run Python Table program:
@@ -374,8 +374,8 @@ Action "run" compiles and runs a program.
UDF worker (e.g.: --pyExecutable
/usr/local/bin/python3). The python
UDF worker depends on a specified Python
- version 3.5, 3.6 or 3.7, Apache Beam
- (version == 2.19.0), Pip (version >= 7.1.0)
+ version 3.5, 3.6 3.7 or 3.8, Apache Beam
+ (version == 2.23.0), Pip (version >= 7.1.0)
and SetupTools (version >= 37.0.0).
Please ensure that the specified environment
meets the above requirements.
diff --git a/docs/ops/cli.zh.md b/docs/ops/cli.zh.md
index 5739227..81b1307 100644
--- a/docs/ops/cli.zh.md
+++ b/docs/ops/cli.zh.md
@@ -118,11 +118,11 @@ option.
<div data-lang="python" markdown="1">
-<span class="label label-info">注意</span> 通过`flink run`提交Python任务时Flink会调用“python”命令。请执行以下命令以确认当前环境下的指令“python”指向Python的版本为3.5, 3.6 或者 3.7中的一个:
+<span class="label label-info">注意</span> 通过`flink run`提交Python任务时Flink会调用“python”命令。请执行以下命令以确认当前环境下的指令“python”指向Python的版本为3.5、3.6、3.7 或 3.8 中的一个:
{% highlight bash %}
$ python --version
-# the version printed here must be 3.5, 3.6 or 3.7
+# the version printed here must be 3.5, 3.6, 3.7 or 3.8
{% endhighlight %}
- 提交一个Python Table的作业:
@@ -373,8 +373,8 @@ Action "run" compiles and runs a program.
UDF worker (e.g.: --pyExecutable
/usr/local/bin/python3). The python
UDF worker depends on a specified Python
- version 3.5, 3.6 or 3.7, Apache Beam
- (version == 2.19.0), Pip (version >= 7.1.0)
+ version 3.5, 3.6 3.7 or 3.8, Apache Beam
+ (version == 2.23.0), Pip (version >= 7.1.0)
and SetupTools (version >= 37.0.0).
Please ensure that the specified environment
meets the above requirements.
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 10f79ad..0d82334 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -154,7 +154,7 @@ public class CliFrontendParser {
public static final Option PYEXEC_OPTION = new Option("pyexec", "pyExecutable", true,
"Specify the path of the python interpreter used to execute the python UDF worker " +
"(e.g.: --pyExecutable /usr/local/bin/python3). " +
- "The python UDF worker depends on Python 3.5+, Apache Beam (version == 2.19.0), " +
+ "The python UDF worker depends on Python 3.5+, Apache Beam (version == 2.23.0), " +
"Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). " +
"Please ensure that the specified environment meets the above requirements.");
diff --git a/flink-python/README.md b/flink-python/README.md
index ea7bb16..ec87028 100644
--- a/flink-python/README.md
+++ b/flink-python/README.md
@@ -16,7 +16,7 @@ The auto-generated Python docs can be found at [https://ci.apache.org/projects/f
## Python Requirements
-Apache Flink Python API depends on Py4J (currently version 0.10.8.1), CloudPickle (currently version 1.2.2), python-dateutil(currently version 2.8.0), Apache Beam (currently version 2.19.0) and jsonpickle (currently 1.2).
+Apache Flink Python API depends on Py4J (currently version 0.10.8.1), CloudPickle (currently version 1.2.2), python-dateutil(currently version 2.8.0), Apache Beam (currently version 2.23.0) and jsonpickle (currently 1.2).
## Development Notices
diff --git a/flink-python/dev/dev-requirements.txt b/flink-python/dev/dev-requirements.txt
index 5da68b2..08faa45 100755
--- a/flink-python/dev/dev-requirements.txt
+++ b/flink-python/dev/dev-requirements.txt
@@ -14,5 +14,5 @@
# limitations under the License.
setuptools>=18.0
wheel
-apache-beam==2.19.0
+apache-beam==2.23.0
cython==0.29.16
diff --git a/flink-python/dev/lint-python.sh b/flink-python/dev/lint-python.sh
index cb65ff8..d7ccbab 100755
--- a/flink-python/dev/lint-python.sh
+++ b/flink-python/dev/lint-python.sh
@@ -219,7 +219,7 @@ function install_miniconda() {
# Install some kinds of py env.
function install_py_env() {
- py_env=("3.5" "3.6" "3.7")
+ py_env=("3.5" "3.6" "3.7" "3.8")
for ((i=0;i<${#py_env[@]};i++)) do
if [ -d "$CURRENT_DIR/.conda/envs/${py_env[i]}" ]; then
rm -rf "$CURRENT_DIR/.conda/envs/${py_env[i]}"
@@ -357,7 +357,7 @@ function install_environment() {
print_function "STEP" "install miniconda... [SUCCESS]"
# step-3 install python environment whcih includes
- # 3.5 3.6 3.7
+ # 3.5 3.6 3.7 3.8
if [ $STEP -lt 3 ] && [ `need_install_component "py_env"` = true ]; then
print_function "STEP" "installing python environment..."
install_py_env
@@ -696,7 +696,7 @@ usage: $0 [options]
-l list all checks supported.
Examples:
./lint-python -s basic => install environment with basic components.
- ./lint-python -s py_env => install environment with python env(3.5,3.6,3.7).
+ ./lint-python -s py_env => install environment with python env(3.5,3.6,3.7,3.8).
./lint-python -s all => install environment with all components such as python env,tox,flake8,sphinx etc.
./lint-python -s tox,flake8 => install environment with tox,flake8.
./lint-python -s tox -f => reinstall environment with tox.
diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py b/flink-python/pyflink/datastream/stream_execution_environment.py
index 099f0ef..c25a7db 100644
--- a/flink-python/pyflink/datastream/stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/stream_execution_environment.py
@@ -543,7 +543,7 @@ class StreamExecutionEnvironment(object):
.. note::
- The python udf worker depends on Apache Beam (version == 2.19.0).
+ The python udf worker depends on Apache Beam (version == 2.23.0).
Please ensure that the specified environment meets the above requirements.
:param python_exec: The path of python interpreter.
diff --git a/flink-python/pyflink/fn_execution/beam/beam_boot.py b/flink-python/pyflink/fn_execution/beam/beam_boot.py
index f460ae6..b0c42dc 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_boot.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_boot.py
@@ -58,27 +58,19 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--id", default="", help="Local identifier (required).")
- parser.add_argument("--logging_endpoint", default="",
- help="Logging endpoint (required).")
parser.add_argument("--provision_endpoint", default="",
help="Provision endpoint (required).")
- parser.add_argument("--control_endpoint", default="",
- help="Control endpoint (required).")
parser.add_argument("--semi_persist_dir", default="/tmp",
help="Local semi-persistent directory (optional).")
args = parser.parse_known_args()[0]
worker_id = args.id
- logging_endpoint = args.logging_endpoint
provision_endpoint = args.provision_endpoint
- control_endpoint = args.control_endpoint
semi_persist_dir = args.semi_persist_dir
check_not_empty(worker_id, "No id provided.")
- check_not_empty(logging_endpoint, "No logging endpoint provided.")
check_not_empty(provision_endpoint, "No provision endpoint provided.")
- check_not_empty(control_endpoint, "No control endpoint provided.")
logging.info("Initializing python harness: %s" % " ".join(sys.argv))
@@ -89,6 +81,8 @@ if __name__ == "__main__":
client = ProvisionServiceStub(channel=channel)
info = client.GetProvisionInfo(GetProvisionInfoRequest(), metadata=metadata).info
options = json_format.MessageToJson(info.pipeline_options)
+ logging_endpoint = info.logging_endpoint.url
+ control_endpoint = info.control_endpoint.url
os.environ["WORKER_ID"] = worker_id
os.environ["PIPELINE_OPTIONS"] = options
diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx b/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
index 8e72376..093ca2e 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
+++ b/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
@@ -108,8 +108,11 @@ cdef class BeamStatelessFunctionOperation(Operation):
str(tag)] = receiver.opcounter.element_counter.value()
return metrics
- cpdef monitoring_infos(self, transform_id):
- # only pass user metric to Java
+ cpdef monitoring_infos(self, transform_id, tag_to_pcollection_id):
+ """
+ Only pass user metric to Java
+ :param tag_to_pcollection_id: useless for user metric
+ """
return self.user_monitoring_infos(transform_id)
cdef void _update_gauge(self, base_metric_group):
diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py b/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
index 4628f3a..9906758 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
@@ -85,8 +85,11 @@ class StatelessFunctionOperation(Operation):
self._value_coder_impl.encode_to_stream(self.func(o.value), output_stream, True)
output_stream.maybe_flush()
- def monitoring_infos(self, transform_id):
- # only pass user metric to Java
+ def monitoring_infos(self, transform_id, tag_to_pcollection_id):
+ """
+ Only pass user metric to Java
+ :param tag_to_pcollection_id: useless for user metric
+ """
return super().user_monitoring_infos(transform_id)
def generate_func(self, udfs) -> tuple:
diff --git a/flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py b/flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py
index af80ed4..402b09a 100644
--- a/flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py
+++ b/flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py
@@ -101,19 +101,8 @@ class PythonBootTests(PyFlinkTestCase):
args = [self.runner_path, "--id", "1"]
exit_message = subprocess.check_output(args, env=self.env).decode("utf-8")
- self.assertIn("No logging endpoint provided.", exit_message)
-
- args = [self.runner_path, "--id", "1",
- "--logging_endpoint", "localhost:0000"]
- exit_message = subprocess.check_output(args, env=self.env).decode("utf-8")
self.assertIn("No provision endpoint provided.", exit_message)
- args = [self.runner_path, "--id", "1",
- "--logging_endpoint", "localhost:0000",
- "--provision_endpoint", "localhost:%d" % self.provision_port]
- exit_message = subprocess.check_output(args, env=self.env).decode("utf-8")
- self.assertIn("No control endpoint provided.", exit_message)
-
def test_set_working_directory(self):
JProcessPythonEnvironmentManager = \
get_gateway().jvm.org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager
diff --git a/flink-python/pyflink/table/table_config.py b/flink-python/pyflink/table/table_config.py
index 6e0b738..14aab37 100644
--- a/flink-python/pyflink/table/table_config.py
+++ b/flink-python/pyflink/table/table_config.py
@@ -366,7 +366,7 @@ class TableConfig(object):
.. note::
- The python udf worker depends on Apache Beam (version == 2.19.0).
+ The python udf worker depends on Apache Beam (version == 2.23.0).
Please ensure that the specified environment meets the above requirements.
:param python_exec: The path of python interpreter.
diff --git a/flink-python/setup.py b/flink-python/setup.py
index 3ae7258..176e2ac8 100644
--- a/flink-python/setup.py
+++ b/flink-python/setup.py
@@ -314,9 +314,11 @@ run sdist.
author='Apache Software Foundation',
author_email='dev@flink.apache.org',
python_requires='>=3.5',
- install_requires=['py4j==0.10.8.1', 'python-dateutil==2.8.0', 'apache-beam==2.19.0',
+ install_requires=['py4j==0.10.8.1', 'python-dateutil==2.8.0', 'apache-beam==2.23.0',
'cloudpickle==1.2.2', 'avro-python3>=1.8.1,<=1.9.1', 'jsonpickle==1.2',
- 'pandas>=0.23.4,<=0.25.3', 'pyarrow>=0.15.1,<0.16.0', 'pytz>=2018.3'],
+ 'pandas>=0.24.2,<1; python_full_version < "3.5.3"',
+ 'pandas>=0.25.2,<1; python_full_version >= "3.5.3"',
+ 'pyarrow>=0.15.1,<0.18.0', 'pytz>=2018.3'],
cmdclass={'build_ext': build_ext},
tests_require=['pytest==4.4.1'],
description='Apache Flink Python API',
@@ -328,7 +330,8 @@ run sdist.
'License :: OSI Approved :: Apache Software License',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
- 'Programming Language :: Python :: 3.7'],
+ 'Programming Language :: Python :: 3.7',
+ 'Programming Language :: Python :: 3.8'],
ext_modules=extensions
)
finally:
diff --git a/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java b/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
index ed34e34..113748b 100644
--- a/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
+++ b/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
@@ -28,8 +28,8 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnStateGrpc;
import org.apache.beam.runners.fnexecution.FnService;
-import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCallStreamObserver;
-import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCallStreamObserver;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
/** An implementation of the Beam Fn State service. */
public class GrpcStateService extends BeamFnStateGrpc.BeamFnStateImplBase
diff --git a/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PoolArena.java b/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PoolArena.java
deleted file mode 100644
index c573759..0000000
--- a/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PoolArena.java
+++ /dev/null
@@ -1,818 +0,0 @@
-/*
- * Copyright 2012 The Netty Project
- *
- * The Netty Project licenses this file to you under the Apache License,
- * version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.beam.vendor.grpc.v1p21p0.io.netty.buffer;
-
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.LongCounter;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.PlatformDependent;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.StringUtil;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
-import static java.lang.Math.max;
-
-// This class is copied from Netty's io.netty.buffer.PoolArena,
-// can be removed after Beam bumps its shaded netty version to 1.22+ (BEAM-9030).
-//
-// Changed lines: 284, 295, 297~300
-
-abstract class PoolArena<T> implements PoolArenaMetric {
- static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();
-
- enum SizeClass {
- Tiny,
- Small,
- Normal
- }
-
- static final int numTinySubpagePools = 512 >>> 4;
-
- final PooledByteBufAllocator parent;
-
- private final int maxOrder;
- final int pageSize;
- final int pageShifts;
- final int chunkSize;
- final int subpageOverflowMask;
- final int numSmallSubpagePools;
- final int directMemoryCacheAlignment;
- final int directMemoryCacheAlignmentMask;
- private final PoolSubpage<T>[] tinySubpagePools;
- private final PoolSubpage<T>[] smallSubpagePools;
-
- private final PoolChunkList<T> q050;
- private final PoolChunkList<T> q025;
- private final PoolChunkList<T> q000;
- private final PoolChunkList<T> qInit;
- private final PoolChunkList<T> q075;
- private final PoolChunkList<T> q100;
-
- private final List<PoolChunkListMetric> chunkListMetrics;
-
- // Metrics for allocations and deallocations
- private long allocationsNormal;
- // We need to use the LongCounter here as this is not guarded via synchronized block.
- private final LongCounter allocationsTiny = PlatformDependent.newLongCounter();
- private final LongCounter allocationsSmall = PlatformDependent.newLongCounter();
- private final LongCounter allocationsHuge = PlatformDependent.newLongCounter();
- private final LongCounter activeBytesHuge = PlatformDependent.newLongCounter();
-
- private long deallocationsTiny;
- private long deallocationsSmall;
- private long deallocationsNormal;
-
- // We need to use the LongCounter here as this is not guarded via synchronized block.
- private final LongCounter deallocationsHuge = PlatformDependent.newLongCounter();
-
- // Number of thread caches backed by this arena.
- final AtomicInteger numThreadCaches = new AtomicInteger();
-
- // TODO: Test if adding padding helps under contention
- //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
-
- protected PoolArena(PooledByteBufAllocator parent, int pageSize,
- int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) {
- this.parent = parent;
- this.pageSize = pageSize;
- this.maxOrder = maxOrder;
- this.pageShifts = pageShifts;
- this.chunkSize = chunkSize;
- directMemoryCacheAlignment = cacheAlignment;
- directMemoryCacheAlignmentMask = cacheAlignment - 1;
- subpageOverflowMask = ~(pageSize - 1);
- tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
- for (int i = 0; i < tinySubpagePools.length; i ++) {
- tinySubpagePools[i] = newSubpagePoolHead(pageSize);
- }
-
- numSmallSubpagePools = pageShifts - 9;
- smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
- for (int i = 0; i < smallSubpagePools.length; i ++) {
- smallSubpagePools[i] = newSubpagePoolHead(pageSize);
- }
-
- q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
- q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
- q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
- q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
- q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
- qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);
-
- q100.prevList(q075);
- q075.prevList(q050);
- q050.prevList(q025);
- q025.prevList(q000);
- q000.prevList(null);
- qInit.prevList(qInit);
-
- List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);
- metrics.add(qInit);
- metrics.add(q000);
- metrics.add(q025);
- metrics.add(q050);
- metrics.add(q075);
- metrics.add(q100);
- chunkListMetrics = Collections.unmodifiableList(metrics);
- }
-
- private PoolSubpage<T> newSubpagePoolHead(int pageSize) {
- PoolSubpage<T> head = new PoolSubpage<T>(pageSize);
- head.prev = head;
- head.next = head;
- return head;
- }
-
- @SuppressWarnings("unchecked")
- private PoolSubpage<T>[] newSubpagePoolArray(int size) {
- return new PoolSubpage[size];
- }
-
- abstract boolean isDirect();
-
- PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
- PooledByteBuf<T> buf = newByteBuf(maxCapacity);
- allocate(cache, buf, reqCapacity);
- return buf;
- }
-
- static int tinyIdx(int normCapacity) {
- return normCapacity >>> 4;
- }
-
- static int smallIdx(int normCapacity) {
- int tableIdx = 0;
- int i = normCapacity >>> 10;
- while (i != 0) {
- i >>>= 1;
- tableIdx ++;
- }
- return tableIdx;
- }
-
- // capacity < pageSize
- boolean isTinyOrSmall(int normCapacity) {
- return (normCapacity & subpageOverflowMask) == 0;
- }
-
- // normCapacity < 512
- static boolean isTiny(int normCapacity) {
- return (normCapacity & 0xFFFFFE00) == 0;
- }
-
- private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
- final int normCapacity = normalizeCapacity(reqCapacity);
- if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
- int tableIdx;
- PoolSubpage<T>[] table;
- boolean tiny = isTiny(normCapacity);
- if (tiny) { // < 512
- if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
- // was able to allocate out of the cache so move on
- return;
- }
- tableIdx = tinyIdx(normCapacity);
- table = tinySubpagePools;
- } else {
- if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
- // was able to allocate out of the cache so move on
- return;
- }
- tableIdx = smallIdx(normCapacity);
- table = smallSubpagePools;
- }
-
- final PoolSubpage<T> head = table[tableIdx];
-
- /**
- * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
- * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
- */
- synchronized (head) {
- final PoolSubpage<T> s = head.next;
- if (s != head) {
- assert s.doNotDestroy && s.elemSize == normCapacity;
- long handle = s.allocate();
- assert handle >= 0;
- s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
- incTinySmallAllocation(tiny);
- return;
- }
- }
- synchronized (this) {
- allocateNormal(buf, reqCapacity, normCapacity);
- }
-
- incTinySmallAllocation(tiny);
- return;
- }
- if (normCapacity <= chunkSize) {
- if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
- // was able to allocate out of the cache so move on
- return;
- }
- synchronized (this) {
- allocateNormal(buf, reqCapacity, normCapacity);
- ++allocationsNormal;
- }
- } else {
- // Huge allocations are never served via the cache so just call allocateHuge
- allocateHuge(buf, reqCapacity);
- }
- }
-
- // Method must be called inside synchronized(this) { ... } block
- private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
- if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
- q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
- q075.allocate(buf, reqCapacity, normCapacity)) {
- return;
- }
-
- // Add a new chunk.
- PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
- boolean success = c.allocate(buf, reqCapacity, normCapacity);
- assert success;
- qInit.add(c);
- }
-
- private void incTinySmallAllocation(boolean tiny) {
- if (tiny) {
- allocationsTiny.increment();
- } else {
- allocationsSmall.increment();
- }
- }
-
- private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
- PoolChunk<T> chunk = newUnpooledChunk(reqCapacity);
- activeBytesHuge.add(chunk.chunkSize());
- buf.initUnpooled(chunk, reqCapacity);
- allocationsHuge.increment();
- }
-
- void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
- if (chunk.unpooled) {
- int size = chunk.chunkSize();
- destroyChunk(chunk);
- activeBytesHuge.add(-size);
- deallocationsHuge.increment();
- } else {
- SizeClass sizeClass = sizeClass(normCapacity);
- if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
- // cached so not free it.
- return;
- }
-
- freeChunk(chunk, handle, sizeClass, nioBuffer, false);
- }
- }
-
- private SizeClass sizeClass(int normCapacity) {
- if (!isTinyOrSmall(normCapacity)) {
- return SizeClass.Normal;
- }
- return isTiny(normCapacity) ? SizeClass.Tiny : SizeClass.Small;
- }
-
- void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass, ByteBuffer nioBuffer, boolean finalizer) {
- final boolean destroyChunk;
- synchronized (this) {
- // We only call this if freeChunk is not called because of the PoolThreadCache finalizer as otherwise this
- // may fail due lazy class-loading in for example tomcat.
- if (!finalizer) {
- switch (sizeClass) {
- case Normal:
- ++deallocationsNormal;
- break;
- case Small:
- ++deallocationsSmall;
- break;
- case Tiny:
- ++deallocationsTiny;
- break;
- default:
- throw new Error();
- }
- }
- destroyChunk = !chunk.parent.free(chunk, handle, nioBuffer);
- }
- if (destroyChunk) {
- // destroyChunk not need to be called while holding the synchronized lock.
- destroyChunk(chunk);
- }
- }
-
- PoolSubpage<T> findSubpagePoolHead(int elemSize) {
- int tableIdx;
- PoolSubpage<T>[] table;
- if (isTiny(elemSize)) { // < 512
- tableIdx = elemSize >>> 4;
- table = tinySubpagePools;
- } else {
- tableIdx = 0;
- elemSize >>>= 10;
- while (elemSize != 0) {
- elemSize >>>= 1;
- tableIdx ++;
- }
- table = smallSubpagePools;
- }
-
- return table[tableIdx];
- }
-
- int normalizeCapacity(int reqCapacity) {
- checkPositiveOrZero(reqCapacity, "reqCapacity");
-
- if (reqCapacity >= chunkSize) {
- return directMemoryCacheAlignment == 0 ? reqCapacity : alignCapacity(reqCapacity);
- }
-
- if (!isTiny(reqCapacity)) { // >= 512
- // Doubled
-
- int normalizedCapacity = reqCapacity;
- normalizedCapacity --;
- normalizedCapacity |= normalizedCapacity >>> 1;
- normalizedCapacity |= normalizedCapacity >>> 2;
- normalizedCapacity |= normalizedCapacity >>> 4;
- normalizedCapacity |= normalizedCapacity >>> 8;
- normalizedCapacity |= normalizedCapacity >>> 16;
- normalizedCapacity ++;
-
- if (normalizedCapacity < 0) {
- normalizedCapacity >>>= 1;
- }
- assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0;
-
- return normalizedCapacity;
- }
-
- if (directMemoryCacheAlignment > 0) {
- return alignCapacity(reqCapacity);
- }
-
- // Quantum-spaced
- if ((reqCapacity & 15) == 0) {
- return reqCapacity;
- }
-
- return (reqCapacity & ~15) + 16;
- }
-
- int alignCapacity(int reqCapacity) {
- int delta = reqCapacity & directMemoryCacheAlignmentMask;
- return delta == 0 ? reqCapacity : reqCapacity + directMemoryCacheAlignment - delta;
- }
-
- void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
- if (newCapacity < 0 || newCapacity > buf.maxCapacity()) {
- throw new IllegalArgumentException("newCapacity: " + newCapacity);
- }
-
- int oldCapacity = buf.length;
- if (oldCapacity == newCapacity) {
- return;
- }
-
- PoolChunk<T> oldChunk = buf.chunk;
- ByteBuffer oldNioBuffer = buf.tmpNioBuf;
- long oldHandle = buf.handle;
- T oldMemory = buf.memory;
- int oldOffset = buf.offset;
- int oldMaxLength = buf.maxLength;
- int readerIndex = buf.readerIndex();
- int writerIndex = buf.writerIndex();
-
- allocate(parent.threadCache(), buf, newCapacity);
- if (newCapacity > oldCapacity) {
- memoryCopy(
- oldMemory, oldOffset,
- buf.memory, buf.offset, oldCapacity);
- } else if (newCapacity < oldCapacity) {
- if (readerIndex < newCapacity) {
- if (writerIndex > newCapacity) {
- writerIndex = newCapacity;
- }
- memoryCopy(
- oldMemory, oldOffset + readerIndex,
- buf.memory, buf.offset + readerIndex, writerIndex - readerIndex);
- } else {
- readerIndex = writerIndex = newCapacity;
- }
- }
-
- buf.setIndex(readerIndex, writerIndex);
-
- if (freeOldMemory) {
- free(oldChunk, oldNioBuffer, oldHandle, oldMaxLength, buf.cache);
- }
- }
-
- @Override
- public int numThreadCaches() {
- return numThreadCaches.get();
- }
-
- @Override
- public int numTinySubpages() {
- return tinySubpagePools.length;
- }
-
- @Override
- public int numSmallSubpages() {
- return smallSubpagePools.length;
- }
-
- @Override
- public int numChunkLists() {
- return chunkListMetrics.size();
- }
-
- @Override
- public List<PoolSubpageMetric> tinySubpages() {
- return subPageMetricList(tinySubpagePools);
- }
-
- @Override
- public List<PoolSubpageMetric> smallSubpages() {
- return subPageMetricList(smallSubpagePools);
- }
-
- @Override
- public List<PoolChunkListMetric> chunkLists() {
- return chunkListMetrics;
- }
-
- private static List<PoolSubpageMetric> subPageMetricList(PoolSubpage<?>[] pages) {
- List<PoolSubpageMetric> metrics = new ArrayList<PoolSubpageMetric>();
- for (PoolSubpage<?> head : pages) {
- if (head.next == head) {
- continue;
- }
- PoolSubpage<?> s = head.next;
- for (;;) {
- metrics.add(s);
- s = s.next;
- if (s == head) {
- break;
- }
- }
- }
- return metrics;
- }
-
- @Override
- public long numAllocations() {
- final long allocsNormal;
- synchronized (this) {
- allocsNormal = allocationsNormal;
- }
- return allocationsTiny.value() + allocationsSmall.value() + allocsNormal + allocationsHuge.value();
- }
-
- @Override
- public long numTinyAllocations() {
- return allocationsTiny.value();
- }
-
- @Override
- public long numSmallAllocations() {
- return allocationsSmall.value();
- }
-
- @Override
- public synchronized long numNormalAllocations() {
- return allocationsNormal;
- }
-
- @Override
- public long numDeallocations() {
- final long deallocs;
- synchronized (this) {
- deallocs = deallocationsTiny + deallocationsSmall + deallocationsNormal;
- }
- return deallocs + deallocationsHuge.value();
- }
-
- @Override
- public synchronized long numTinyDeallocations() {
- return deallocationsTiny;
- }
-
- @Override
- public synchronized long numSmallDeallocations() {
- return deallocationsSmall;
- }
-
- @Override
- public synchronized long numNormalDeallocations() {
- return deallocationsNormal;
- }
-
- @Override
- public long numHugeAllocations() {
- return allocationsHuge.value();
- }
-
- @Override
- public long numHugeDeallocations() {
- return deallocationsHuge.value();
- }
-
- @Override
- public long numActiveAllocations() {
- long val = allocationsTiny.value() + allocationsSmall.value() + allocationsHuge.value()
- - deallocationsHuge.value();
- synchronized (this) {
- val += allocationsNormal - (deallocationsTiny + deallocationsSmall + deallocationsNormal);
- }
- return max(val, 0);
- }
-
- @Override
- public long numActiveTinyAllocations() {
- return max(numTinyAllocations() - numTinyDeallocations(), 0);
- }
-
- @Override
- public long numActiveSmallAllocations() {
- return max(numSmallAllocations() - numSmallDeallocations(), 0);
- }
-
- @Override
- public long numActiveNormalAllocations() {
- final long val;
- synchronized (this) {
- val = allocationsNormal - deallocationsNormal;
- }
- return max(val, 0);
- }
-
- @Override
- public long numActiveHugeAllocations() {
- return max(numHugeAllocations() - numHugeDeallocations(), 0);
- }
-
- @Override
- public long numActiveBytes() {
- long val = activeBytesHuge.value();
- synchronized (this) {
- for (int i = 0; i < chunkListMetrics.size(); i++) {
- for (PoolChunkMetric m: chunkListMetrics.get(i)) {
- val += m.chunkSize();
- }
- }
- }
- return max(0, val);
- }
-
- protected abstract PoolChunk<T> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize);
- protected abstract PoolChunk<T> newUnpooledChunk(int capacity);
- protected abstract PooledByteBuf<T> newByteBuf(int maxCapacity);
- protected abstract void memoryCopy(T src, int srcOffset, T dst, int dstOffset, int length);
- protected abstract void destroyChunk(PoolChunk<T> chunk);
-
- @Override
- public synchronized String toString() {
- StringBuilder buf = new StringBuilder()
- .append("Chunk(s) at 0~25%:")
- .append(StringUtil.NEWLINE)
- .append(qInit)
- .append(StringUtil.NEWLINE)
- .append("Chunk(s) at 0~50%:")
- .append(StringUtil.NEWLINE)
- .append(q000)
- .append(StringUtil.NEWLINE)
- .append("Chunk(s) at 25~75%:")
- .append(StringUtil.NEWLINE)
- .append(q025)
- .append(StringUtil.NEWLINE)
- .append("Chunk(s) at 50~100%:")
- .append(StringUtil.NEWLINE)
- .append(q050)
- .append(StringUtil.NEWLINE)
- .append("Chunk(s) at 75~100%:")
- .append(StringUtil.NEWLINE)
- .append(q075)
- .append(StringUtil.NEWLINE)
- .append("Chunk(s) at 100%:")
- .append(StringUtil.NEWLINE)
- .append(q100)
- .append(StringUtil.NEWLINE)
- .append("tiny subpages:");
- appendPoolSubPages(buf, tinySubpagePools);
- buf.append(StringUtil.NEWLINE)
- .append("small subpages:");
- appendPoolSubPages(buf, smallSubpagePools);
- buf.append(StringUtil.NEWLINE);
-
- return buf.toString();
- }
-
- private static void appendPoolSubPages(StringBuilder buf, PoolSubpage<?>[] subpages) {
- for (int i = 0; i < subpages.length; i ++) {
- PoolSubpage<?> head = subpages[i];
- if (head.next == head) {
- continue;
- }
-
- buf.append(StringUtil.NEWLINE)
- .append(i)
- .append(": ");
- PoolSubpage<?> s = head.next;
- for (;;) {
- buf.append(s);
- s = s.next;
- if (s == head) {
- break;
- }
- }
- }
- }
-
- @Override
- protected final void finalize() throws Throwable {
- try {
- super.finalize();
- } finally {
- destroyPoolSubPages(smallSubpagePools);
- destroyPoolSubPages(tinySubpagePools);
- destroyPoolChunkLists(qInit, q000, q025, q050, q075, q100);
- }
- }
-
- private static void destroyPoolSubPages(PoolSubpage<?>[] pages) {
- for (PoolSubpage<?> page : pages) {
- page.destroy();
- }
- }
-
- private void destroyPoolChunkLists(PoolChunkList<T>... chunkLists) {
- for (PoolChunkList<T> chunkList: chunkLists) {
- chunkList.destroy(this);
- }
- }
-
- static final class HeapArena extends PoolArena<byte[]> {
-
- HeapArena(PooledByteBufAllocator parent, int pageSize, int maxOrder,
- int pageShifts, int chunkSize, int directMemoryCacheAlignment) {
- super(parent, pageSize, maxOrder, pageShifts, chunkSize,
- directMemoryCacheAlignment);
- }
-
- private static byte[] newByteArray(int size) {
- return PlatformDependent.allocateUninitializedArray(size);
- }
-
- @Override
- boolean isDirect() {
- return false;
- }
-
- @Override
- protected PoolChunk<byte[]> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {
- return new PoolChunk<byte[]>(this, newByteArray(chunkSize), pageSize, maxOrder, pageShifts, chunkSize, 0);
- }
-
- @Override
- protected PoolChunk<byte[]> newUnpooledChunk(int capacity) {
- return new PoolChunk<byte[]>(this, newByteArray(capacity), capacity, 0);
- }
-
- @Override
- protected void destroyChunk(PoolChunk<byte[]> chunk) {
- // Rely on GC.
- }
-
- @Override
- protected PooledByteBuf<byte[]> newByteBuf(int maxCapacity) {
- return HAS_UNSAFE ? PooledUnsafeHeapByteBuf.newUnsafeInstance(maxCapacity)
- : PooledHeapByteBuf.newInstance(maxCapacity);
- }
-
- @Override
- protected void memoryCopy(byte[] src, int srcOffset, byte[] dst, int dstOffset, int length) {
- if (length == 0) {
- return;
- }
-
- System.arraycopy(src, srcOffset, dst, dstOffset, length);
- }
- }
-
- static final class DirectArena extends PoolArena<ByteBuffer> {
-
- DirectArena(PooledByteBufAllocator parent, int pageSize, int maxOrder,
- int pageShifts, int chunkSize, int directMemoryCacheAlignment) {
- super(parent, pageSize, maxOrder, pageShifts, chunkSize,
- directMemoryCacheAlignment);
- }
-
- @Override
- boolean isDirect() {
- return true;
- }
-
- // mark as package-private, only for unit test
- int offsetCacheLine(ByteBuffer memory) {
- // We can only calculate the offset if Unsafe is present as otherwise directBufferAddress(...) will
- // throw an NPE.
- int remainder = HAS_UNSAFE
- ? (int) (PlatformDependent.directBufferAddress(memory) & directMemoryCacheAlignmentMask)
- : 0;
-
- // offset = alignment - address & (alignment - 1)
- return directMemoryCacheAlignment - remainder;
- }
-
- @Override
- protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder,
- int pageShifts, int chunkSize) {
- if (directMemoryCacheAlignment == 0) {
- return new PoolChunk<ByteBuffer>(this,
- allocateDirect(chunkSize), pageSize, maxOrder,
- pageShifts, chunkSize, 0);
- }
- final ByteBuffer memory = allocateDirect(chunkSize
- + directMemoryCacheAlignment);
- return new PoolChunk<ByteBuffer>(this, memory, pageSize,
- maxOrder, pageShifts, chunkSize,
- offsetCacheLine(memory));
- }
-
- @Override
- protected PoolChunk<ByteBuffer> newUnpooledChunk(int capacity) {
- if (directMemoryCacheAlignment == 0) {
- return new PoolChunk<ByteBuffer>(this,
- allocateDirect(capacity), capacity, 0);
- }
- final ByteBuffer memory = allocateDirect(capacity
- + directMemoryCacheAlignment);
- return new PoolChunk<ByteBuffer>(this, memory, capacity,
- offsetCacheLine(memory));
- }
-
- private static ByteBuffer allocateDirect(int capacity) {
- return PlatformDependent.useDirectBufferNoCleaner() ?
- PlatformDependent.allocateDirectNoCleaner(capacity) : ByteBuffer.allocateDirect(capacity);
- }
-
- @Override
- protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {
- if (PlatformDependent.useDirectBufferNoCleaner()) {
- PlatformDependent.freeDirectNoCleaner(chunk.memory);
- } else {
- PlatformDependent.freeDirectBuffer(chunk.memory);
- }
- }
-
- @Override
- protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
- if (HAS_UNSAFE) {
- return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
- } else {
- return PooledDirectByteBuf.newInstance(maxCapacity);
- }
- }
-
- @Override
- protected void memoryCopy(ByteBuffer src, int srcOffset, ByteBuffer dst, int dstOffset, int length) {
- if (length == 0) {
- return;
- }
-
- if (HAS_UNSAFE) {
- PlatformDependent.copyMemory(
- PlatformDependent.directBufferAddress(src) + srcOffset,
- PlatformDependent.directBufferAddress(dst) + dstOffset, length);
- } else {
- // We must duplicate the NIO buffers because they may be accessed by other Netty buffers.
- src = src.duplicate();
- dst = dst.duplicate();
- src.position(srcOffset).limit(srcOffset + length);
- dst.position(dstOffset);
- dst.put(src);
- }
- }
- }
-}
diff --git a/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PoolThreadCache.java b/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PoolThreadCache.java
deleted file mode 100644
index 37244e5..0000000
--- a/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PoolThreadCache.java
+++ /dev/null
@@ -1,508 +0,0 @@
-/*
- * Copyright 2012 The Netty Project
- *
- * The Netty Project licenses this file to you under the Apache License,
- * version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.beam.vendor.grpc.v1p21p0.io.netty.buffer;
-
-
-import static org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
-
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.buffer.PoolArena.SizeClass;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.Recycler;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.Recycler.Handle;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.MathUtil;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.PlatformDependent;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.logging.InternalLogger;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.logging.InternalLoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-// This class is copied from Netty's io.netty.buffer.PoolThreadCache,
-// can be removed after Beam bumps its shaded netty version to 1.22+ (BEAM-9030).
-//
-// Changed lines: 235, 242, 246~251, 268, 275, 280, 284, 426~427, 430, 435, 453, 458, 463~467, 469
-
-/**
- * Acts a Thread cache for allocations. This implementation is moduled after
- * <a href="http://people.freebsd.org/~jasone/jemalloc/bsdcan2006/jemalloc.pdf">jemalloc</a> and the descripted
- * technics of
- * <a href="https://www.facebook.com/notes/facebook-engineering/scalable-memory-allocation-using-jemalloc/480222803919">
- * Scalable memory allocation using jemalloc</a>.
- */
-final class PoolThreadCache {
-
- private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
-
- final PoolArena<byte[]> heapArena;
- final PoolArena<ByteBuffer> directArena;
-
- // Hold the caches for the different size classes, which are tiny, small and normal.
- private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
- private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
- private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
- private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
- private final MemoryRegionCache<byte[]>[] normalHeapCaches;
- private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
-
- // Used for bitshifting when calculate the index of normal caches later
- private final int numShiftsNormalDirect;
- private final int numShiftsNormalHeap;
- private final int freeSweepAllocationThreshold;
- private final AtomicBoolean freed = new AtomicBoolean();
-
- private int allocations;
-
- // TODO: Test if adding padding helps under contention
- //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
-
- PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
- int tinyCacheSize, int smallCacheSize, int normalCacheSize,
- int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
- checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
- this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
- this.heapArena = heapArena;
- this.directArena = directArena;
- if (directArena != null) {
- tinySubPageDirectCaches = createSubPageCaches(
- tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
- smallSubPageDirectCaches = createSubPageCaches(
- smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
-
- numShiftsNormalDirect = log2(directArena.pageSize);
- normalDirectCaches = createNormalCaches(
- normalCacheSize, maxCachedBufferCapacity, directArena);
-
- directArena.numThreadCaches.getAndIncrement();
- } else {
- // No directArea is configured so just null out all caches
- tinySubPageDirectCaches = null;
- smallSubPageDirectCaches = null;
- normalDirectCaches = null;
- numShiftsNormalDirect = -1;
- }
- if (heapArena != null) {
- // Create the caches for the heap allocations
- tinySubPageHeapCaches = createSubPageCaches(
- tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
- smallSubPageHeapCaches = createSubPageCaches(
- smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
-
- numShiftsNormalHeap = log2(heapArena.pageSize);
- normalHeapCaches = createNormalCaches(
- normalCacheSize, maxCachedBufferCapacity, heapArena);
-
- heapArena.numThreadCaches.getAndIncrement();
- } else {
- // No heapArea is configured so just null out all caches
- tinySubPageHeapCaches = null;
- smallSubPageHeapCaches = null;
- normalHeapCaches = null;
- numShiftsNormalHeap = -1;
- }
-
- // Only check if there are caches in use.
- if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
- || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
- && freeSweepAllocationThreshold < 1) {
- throw new IllegalArgumentException("freeSweepAllocationThreshold: "
- + freeSweepAllocationThreshold + " (expected: > 0)");
- }
- }
-
- private static <T> MemoryRegionCache<T>[] createSubPageCaches(
- int cacheSize, int numCaches, SizeClass sizeClass) {
- if (cacheSize > 0 && numCaches > 0) {
- @SuppressWarnings("unchecked")
- MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
- for (int i = 0; i < cache.length; i++) {
- // TODO: maybe use cacheSize / cache.length
- cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
- }
- return cache;
- } else {
- return null;
- }
- }
-
- private static <T> MemoryRegionCache<T>[] createNormalCaches(
- int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
- if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
- int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
- int arraySize = Math.max(1, log2(max / area.pageSize) + 1);
-
- @SuppressWarnings("unchecked")
- MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
- for (int i = 0; i < cache.length; i++) {
- cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
- }
- return cache;
- } else {
- return null;
- }
- }
-
- private static int log2(int val) {
- int res = 0;
- while (val > 1) {
- val >>= 1;
- res++;
- }
- return res;
- }
-
- /**
- * Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
- */
- boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
- return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
- }
-
- /**
- * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
- */
- boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
- return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity);
- }
-
- /**
- * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
- */
- boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
- return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
- if (cache == null) {
- // no cache found so just return false here
- return false;
- }
- boolean allocated = cache.allocate(buf, reqCapacity);
- if (++ allocations >= freeSweepAllocationThreshold) {
- allocations = 0;
- trim();
- }
- return allocated;
- }
-
- /**
- * Add {@link PoolChunk} and {@code handle} to the cache if there is enough room.
- * Returns {@code true} if it fit into the cache {@code false} otherwise.
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
- long handle, int normCapacity, SizeClass sizeClass) {
- MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
- if (cache == null) {
- return false;
- }
- return cache.add(chunk, nioBuffer, handle);
- }
-
- private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
- switch (sizeClass) {
- case Normal:
- return cacheForNormal(area, normCapacity);
- case Small:
- return cacheForSmall(area, normCapacity);
- case Tiny:
- return cacheForTiny(area, normCapacity);
- default:
- throw new Error();
- }
- }
-
- /// TODO: In the future when we move to Java9+ we should use java.lang.ref.Cleaner.
- @Override
- protected void finalize() throws Throwable {
- try {
- super.finalize();
- } finally {
- free(true);
- }
- }
-
- /**
- * Should be called if the Thread that uses this cache is about to exist to release resources out of the cache
- */
- void free(boolean finalizer) {
- // As free() may be called either by the finalizer or by FastThreadLocal.onRemoval(...) we need to ensure
- // we only call this one time.
- if (freed.compareAndSet(false, true)) {
- int numFreed = free(tinySubPageDirectCaches, finalizer) +
- free(smallSubPageDirectCaches, finalizer) +
- free(normalDirectCaches, finalizer) +
- free(tinySubPageHeapCaches, finalizer) +
- free(smallSubPageHeapCaches, finalizer) +
- free(normalHeapCaches, finalizer);
-
- if (numFreed > 0 && logger.isDebugEnabled()) {
- logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed,
- Thread.currentThread().getName());
- }
-
- if (directArena != null) {
- directArena.numThreadCaches.getAndDecrement();
- }
-
- if (heapArena != null) {
- heapArena.numThreadCaches.getAndDecrement();
- }
- }
- }
-
- private static int free(MemoryRegionCache<?>[] caches, boolean finalizer) {
- if (caches == null) {
- return 0;
- }
-
- int numFreed = 0;
- for (MemoryRegionCache<?> c: caches) {
- numFreed += free(c, finalizer);
- }
- return numFreed;
- }
-
- private static int free(MemoryRegionCache<?> cache, boolean finalizer) {
- if (cache == null) {
- return 0;
- }
- return cache.free(finalizer);
- }
-
- void trim() {
- trim(tinySubPageDirectCaches);
- trim(smallSubPageDirectCaches);
- trim(normalDirectCaches);
- trim(tinySubPageHeapCaches);
- trim(smallSubPageHeapCaches);
- trim(normalHeapCaches);
- }
-
- private static void trim(MemoryRegionCache<?>[] caches) {
- if (caches == null) {
- return;
- }
- for (MemoryRegionCache<?> c: caches) {
- trim(c);
- }
- }
-
- private static void trim(MemoryRegionCache<?> cache) {
- if (cache == null) {
- return;
- }
- cache.trim();
- }
-
- private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
- int idx = PoolArena.tinyIdx(normCapacity);
- if (area.isDirect()) {
- return cache(tinySubPageDirectCaches, idx);
- }
- return cache(tinySubPageHeapCaches, idx);
- }
-
- private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) {
- int idx = PoolArena.smallIdx(normCapacity);
- if (area.isDirect()) {
- return cache(smallSubPageDirectCaches, idx);
- }
- return cache(smallSubPageHeapCaches, idx);
- }
-
- private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
- if (area.isDirect()) {
- int idx = log2(normCapacity >> numShiftsNormalDirect);
- return cache(normalDirectCaches, idx);
- }
- int idx = log2(normCapacity >> numShiftsNormalHeap);
- return cache(normalHeapCaches, idx);
- }
-
- private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
- if (cache == null || idx > cache.length - 1) {
- return null;
- }
- return cache[idx];
- }
-
- /**
- * Cache used for buffers which are backed by TINY or SMALL size.
- */
- private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
- SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
- super(size, sizeClass);
- }
-
- @Override
- protected void initBuf(
- PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity) {
- chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity);
- }
- }
-
- /**
- * Cache used for buffers which are backed by NORMAL size.
- */
- private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> {
- NormalMemoryRegionCache(int size) {
- super(size, SizeClass.Normal);
- }
-
- @Override
- protected void initBuf(
- PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity) {
- chunk.initBuf(buf, nioBuffer, handle, reqCapacity);
- }
- }
-
- private abstract static class MemoryRegionCache<T> {
- private final int size;
- private final Queue<Entry<T>> queue;
- private final SizeClass sizeClass;
- private int allocations;
-
- MemoryRegionCache(int size, SizeClass sizeClass) {
- this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
- queue = PlatformDependent.newFixedMpscQueue(this.size);
- this.sizeClass = sizeClass;
- }
-
- /**
- * Init the {@link PooledByteBuf} using the provided chunk and handle with the capacity restrictions.
- */
- protected abstract void initBuf(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle,
- PooledByteBuf<T> buf, int reqCapacity);
-
- /**
- * Add to cache if not already full.
- */
- @SuppressWarnings("unchecked")
- public final boolean add(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle) {
- Entry<T> entry = newEntry(chunk, nioBuffer, handle);
- boolean queued = queue.offer(entry);
- if (!queued) {
- // If it was not possible to cache the chunk, immediately recycle the entry
- entry.recycle();
- }
-
- return queued;
- }
-
- /**
- * Allocate something out of the cache if possible and remove the entry from the cache.
- */
- public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
- Entry<T> entry = queue.poll();
- if (entry == null) {
- return false;
- }
- initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity);
- entry.recycle();
-
- // allocations is not thread-safe which is fine as this is only called from the same thread all time.
- ++ allocations;
- return true;
- }
-
- /**
- * Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s.
- */
- public final int free(boolean finalizer) {
- return free(Integer.MAX_VALUE, finalizer);
- }
-
- private int free(int max, boolean finalizer) {
- int numFreed = 0;
- for (; numFreed < max; numFreed++) {
- Entry<T> entry = queue.poll();
- if (entry != null) {
- freeEntry(entry, finalizer);
- } else {
- // all cleared
- return numFreed;
- }
- }
- return numFreed;
- }
-
- /**
- * Free up cached {@link PoolChunk}s if not allocated frequently enough.
- */
- public final void trim() {
- int free = size - allocations;
- allocations = 0;
-
- // We not even allocated all the number that are
- if (free > 0) {
- free(free, false);
- }
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- private void freeEntry(Entry entry, boolean finalizer) {
- PoolChunk chunk = entry.chunk;
- long handle = entry.handle;
- ByteBuffer nioBuffer = entry.nioBuffer;
-
- if (!finalizer) {
- // recycle now so PoolChunk can be GC'ed. This will only be done if this is not freed because of
- // a finalizer.
- entry.recycle();
- }
-
- chunk.arena.freeChunk(chunk, handle, sizeClass, nioBuffer, finalizer);
- }
-
- static final class Entry<T> {
- final Handle<Entry<?>> recyclerHandle;
- PoolChunk<T> chunk;
- ByteBuffer nioBuffer;
- long handle = -1;
-
- Entry(Handle<Entry<?>> recyclerHandle) {
- this.recyclerHandle = recyclerHandle;
- }
-
- void recycle() {
- chunk = null;
- nioBuffer = null;
- handle = -1;
- recyclerHandle.recycle(this);
- }
- }
-
- @SuppressWarnings("rawtypes")
- private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle) {
- Entry entry = RECYCLER.get();
- entry.chunk = chunk;
- entry.nioBuffer = nioBuffer;
- entry.handle = handle;
- return entry;
- }
-
- @SuppressWarnings("rawtypes")
- private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
- @SuppressWarnings("unchecked")
- @Override
- protected Entry newObject(Handle<Entry> handle) {
- return new Entry(handle);
- }
- };
- }
-}
diff --git a/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PooledByteBufAllocator.java b/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PooledByteBufAllocator.java
deleted file mode 100644
index 2124bdc..0000000
--- a/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PooledByteBufAllocator.java
+++ /dev/null
@@ -1,640 +0,0 @@
-/*
- * Copyright 2012 The Netty Project
- *
- * The Netty Project licenses this file to you under the Apache License,
- * version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.beam.vendor.grpc.v1p21p0.io.netty.buffer;
-
-import static org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
-
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.NettyRuntime;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.concurrent.FastThreadLocal;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.concurrent.FastThreadLocalThread;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.PlatformDependent;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.StringUtil;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.SystemPropertyUtil;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.logging.InternalLogger;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.logging.InternalLoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-// This class is copied from Netty's io.netty.buffer.PooledByteBufAllocator,
-// can be removed after Beam bumps its shaded netty version to 1.22+ (BEAM-9030).
-//
-// Changed lines: 458
-
-public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
-
- private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledByteBufAllocator.class);
- private static final int DEFAULT_NUM_HEAP_ARENA;
- private static final int DEFAULT_NUM_DIRECT_ARENA;
-
- private static final int DEFAULT_PAGE_SIZE;
- private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB per chunk
- private static final int DEFAULT_TINY_CACHE_SIZE;
- private static final int DEFAULT_SMALL_CACHE_SIZE;
- private static final int DEFAULT_NORMAL_CACHE_SIZE;
- private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
- private static final int DEFAULT_CACHE_TRIM_INTERVAL;
- private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS;
- private static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT;
- static final int DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK;
-
- private static final int MIN_PAGE_SIZE = 4096;
- private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);
-
- static {
- int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);
- Throwable pageSizeFallbackCause = null;
- try {
- validateAndCalculatePageShifts(defaultPageSize);
- } catch (Throwable t) {
- pageSizeFallbackCause = t;
- defaultPageSize = 8192;
- }
- DEFAULT_PAGE_SIZE = defaultPageSize;
-
- int defaultMaxOrder = SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 11);
- Throwable maxOrderFallbackCause = null;
- try {
- validateAndCalculateChunkSize(DEFAULT_PAGE_SIZE, defaultMaxOrder);
- } catch (Throwable t) {
- maxOrderFallbackCause = t;
- defaultMaxOrder = 11;
- }
- DEFAULT_MAX_ORDER = defaultMaxOrder;
-
- // Determine reasonable default for nHeapArena and nDirectArena.
- // Assuming each arena has 3 chunks, the pool should not consume more than 50% of max memory.
- final Runtime runtime = Runtime.getRuntime();
-
- /*
- * We use 2 * available processors by default to reduce contention as we use 2 * available processors for the
- * number of EventLoops in NIO and EPOLL as well. If we choose a smaller number we will run into hot spots as
- * allocation and de-allocation needs to be synchronized on the PoolArena.
- *
- * See https://github.com/netty/netty/issues/3888.
- */
- final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2;
- final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
- DEFAULT_NUM_HEAP_ARENA = Math.max(0,
- SystemPropertyUtil.getInt(
- "io.netty.allocator.numHeapArenas",
- (int) Math.min(
- defaultMinNumArena,
- runtime.maxMemory() / defaultChunkSize / 2 / 3)));
- DEFAULT_NUM_DIRECT_ARENA = Math.max(0,
- SystemPropertyUtil.getInt(
- "io.netty.allocator.numDirectArenas",
- (int) Math.min(
- defaultMinNumArena,
- PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));
-
- // cache sizes
- DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
- DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
- DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);
-
- // 32 kb is the default maximum capacity of the cached buffer. Similar to what is explained in
- // 'Scalable memory allocation using jemalloc'
- DEFAULT_MAX_CACHED_BUFFER_CAPACITY = SystemPropertyUtil.getInt(
- "io.netty.allocator.maxCachedBufferCapacity", 32 * 1024);
-
- // the number of threshold of allocations when cached entries will be freed up if not frequently used
- DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt(
- "io.netty.allocator.cacheTrimInterval", 8192);
-
- DEFAULT_USE_CACHE_FOR_ALL_THREADS = SystemPropertyUtil.getBoolean(
- "io.netty.allocator.useCacheForAllThreads", true);
-
- DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT = SystemPropertyUtil.getInt(
- "io.netty.allocator.directMemoryCacheAlignment", 0);
-
- // Use 1023 by default as we use an ArrayDeque as backing storage which will then allocate an internal array
- // of 1024 elements. Otherwise we would allocate 2048 and only use 1024 which is wasteful.
- DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK = SystemPropertyUtil.getInt(
- "io.netty.allocator.maxCachedByteBuffersPerChunk", 1023);
-
- if (logger.isDebugEnabled()) {
- logger.debug("-Dio.netty.allocator.numHeapArenas: {}", DEFAULT_NUM_HEAP_ARENA);
- logger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA);
- if (pageSizeFallbackCause == null) {
- logger.debug("-Dio.netty.allocator.pageSize: {}", DEFAULT_PAGE_SIZE);
- } else {
- logger.debug("-Dio.netty.allocator.pageSize: {}", DEFAULT_PAGE_SIZE, pageSizeFallbackCause);
- }
- if (maxOrderFallbackCause == null) {
- logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER);
- } else {
- logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER, maxOrderFallbackCause);
- }
- logger.debug("-Dio.netty.allocator.chunkSize: {}", DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER);
- logger.debug("-Dio.netty.allocator.tinyCacheSize: {}", DEFAULT_TINY_CACHE_SIZE);
- logger.debug("-Dio.netty.allocator.smallCacheSize: {}", DEFAULT_SMALL_CACHE_SIZE);
- logger.debug("-Dio.netty.allocator.normalCacheSize: {}", DEFAULT_NORMAL_CACHE_SIZE);
- logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY);
- logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}", DEFAULT_CACHE_TRIM_INTERVAL);
- logger.debug("-Dio.netty.allocator.useCacheForAllThreads: {}", DEFAULT_USE_CACHE_FOR_ALL_THREADS);
- logger.debug("-Dio.netty.allocator.maxCachedByteBuffersPerChunk: {}",
- DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK);
- }
- }
-
- public static final PooledByteBufAllocator DEFAULT =
- new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
-
- private final PoolArena<byte[]>[] heapArenas;
- private final PoolArena<ByteBuffer>[] directArenas;
- private final int tinyCacheSize;
- private final int smallCacheSize;
- private final int normalCacheSize;
- private final List<PoolArenaMetric> heapArenaMetrics;
- private final List<PoolArenaMetric> directArenaMetrics;
- private final PoolThreadLocalCache threadCache;
- private final int chunkSize;
- private final PooledByteBufAllocatorMetric metric;
-
- public PooledByteBufAllocator() {
- this(false);
- }
-
- @SuppressWarnings("deprecation")
- public PooledByteBufAllocator(boolean preferDirect) {
- this(preferDirect, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER);
- }
-
- @SuppressWarnings("deprecation")
- public PooledByteBufAllocator(int nHeapArena, int nDirectArena, int pageSize, int maxOrder) {
- this(false, nHeapArena, nDirectArena, pageSize, maxOrder);
- }
-
- /**
- * @deprecated use
- * {@link PooledByteBufAllocator#PooledByteBufAllocator(boolean, int, int, int, int, int, int, int, boolean)}
- */
- @Deprecated
- public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) {
- this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder,
- DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE);
- }
-
- /**
- * @deprecated use
- * {@link PooledByteBufAllocator#PooledByteBufAllocator(boolean, int, int, int, int, int, int, int, boolean)}
- */
- @Deprecated
- public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
- int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
- this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, tinyCacheSize, smallCacheSize,
- normalCacheSize, DEFAULT_USE_CACHE_FOR_ALL_THREADS, DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT);
- }
-
- public PooledByteBufAllocator(boolean preferDirect, int nHeapArena,
- int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize,
- int smallCacheSize, int normalCacheSize,
- boolean useCacheForAllThreads) {
- this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder,
- tinyCacheSize, smallCacheSize, normalCacheSize,
- useCacheForAllThreads, DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT);
- }
-
- public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
- int tinyCacheSize, int smallCacheSize, int normalCacheSize,
- boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
- super(preferDirect);
- threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
- this.tinyCacheSize = tinyCacheSize;
- this.smallCacheSize = smallCacheSize;
- this.normalCacheSize = normalCacheSize;
- chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
-
- checkPositiveOrZero(nHeapArena, "nHeapArena");
- checkPositiveOrZero(nDirectArena, "nDirectArena");
-
- checkPositiveOrZero(directMemoryCacheAlignment, "directMemoryCacheAlignment");
- if (directMemoryCacheAlignment > 0 && !isDirectMemoryCacheAlignmentSupported()) {
- throw new IllegalArgumentException("directMemoryCacheAlignment is not supported");
- }
-
- if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) {
- throw new IllegalArgumentException("directMemoryCacheAlignment: "
- + directMemoryCacheAlignment + " (expected: power of two)");
- }
-
- int pageShifts = validateAndCalculatePageShifts(pageSize);
-
- if (nHeapArena > 0) {
- heapArenas = newArenaArray(nHeapArena);
- List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
- for (int i = 0; i < heapArenas.length; i ++) {
- PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
- pageSize, maxOrder, pageShifts, chunkSize,
- directMemoryCacheAlignment);
- heapArenas[i] = arena;
- metrics.add(arena);
- }
- heapArenaMetrics = Collections.unmodifiableList(metrics);
- } else {
- heapArenas = null;
- heapArenaMetrics = Collections.emptyList();
- }
-
- if (nDirectArena > 0) {
- directArenas = newArenaArray(nDirectArena);
- List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
- for (int i = 0; i < directArenas.length; i ++) {
- PoolArena.DirectArena arena = new PoolArena.DirectArena(
- this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
- directArenas[i] = arena;
- metrics.add(arena);
- }
- directArenaMetrics = Collections.unmodifiableList(metrics);
- } else {
- directArenas = null;
- directArenaMetrics = Collections.emptyList();
- }
- metric = new PooledByteBufAllocatorMetric(this);
- }
-
- @SuppressWarnings("unchecked")
- private static <T> PoolArena<T>[] newArenaArray(int size) {
- return new PoolArena[size];
- }
-
- private static int validateAndCalculatePageShifts(int pageSize) {
- if (pageSize < MIN_PAGE_SIZE) {
- throw new IllegalArgumentException("pageSize: " + pageSize + " (expected: " + MIN_PAGE_SIZE + ")");
- }
-
- if ((pageSize & pageSize - 1) != 0) {
- throw new IllegalArgumentException("pageSize: " + pageSize + " (expected: power of 2)");
- }
-
- // Logarithm base 2. At this point we know that pageSize is a power of two.
- return Integer.SIZE - 1 - Integer.numberOfLeadingZeros(pageSize);
- }
-
- private static int validateAndCalculateChunkSize(int pageSize, int maxOrder) {
- if (maxOrder > 14) {
- throw new IllegalArgumentException("maxOrder: " + maxOrder + " (expected: 0-14)");
- }
-
- // Ensure the resulting chunkSize does not overflow.
- int chunkSize = pageSize;
- for (int i = maxOrder; i > 0; i --) {
- if (chunkSize > MAX_CHUNK_SIZE / 2) {
- throw new IllegalArgumentException(String.format(
- "pageSize (%d) << maxOrder (%d) must not exceed %d", pageSize, maxOrder, MAX_CHUNK_SIZE));
- }
- chunkSize <<= 1;
- }
- return chunkSize;
- }
-
- @Override
- protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
- PoolThreadCache cache = threadCache.get();
- PoolArena<byte[]> heapArena = cache.heapArena;
-
- final ByteBuf buf;
- if (heapArena != null) {
- buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
- } else {
- buf = PlatformDependent.hasUnsafe() ?
- new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
- new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
- }
-
- return toLeakAwareBuffer(buf);
- }
-
- @Override
- protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
- PoolThreadCache cache = threadCache.get();
- PoolArena<ByteBuffer> directArena = cache.directArena;
-
- final ByteBuf buf;
- if (directArena != null) {
- buf = directArena.allocate(cache, initialCapacity, maxCapacity);
- } else {
- buf = PlatformDependent.hasUnsafe() ?
- UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
- new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
- }
-
- return toLeakAwareBuffer(buf);
- }
-
- /**
- * Default number of heap arenas - System Property: io.netty.allocator.numHeapArenas - default 2 * cores
- */
- public static int defaultNumHeapArena() {
- return DEFAULT_NUM_HEAP_ARENA;
- }
-
- /**
- * Default number of direct arenas - System Property: io.netty.allocator.numDirectArenas - default 2 * cores
- */
- public static int defaultNumDirectArena() {
- return DEFAULT_NUM_DIRECT_ARENA;
- }
-
- /**
- * Default buffer page size - System Property: io.netty.allocator.pageSize - default 8192
- */
- public static int defaultPageSize() {
- return DEFAULT_PAGE_SIZE;
- }
-
- /**
- * Default maximum order - System Property: io.netty.allocator.maxOrder - default 11
- */
- public static int defaultMaxOrder() {
- return DEFAULT_MAX_ORDER;
- }
-
- /**
- * Default thread caching behavior - System Property: io.netty.allocator.useCacheForAllThreads - default true
- */
- public static boolean defaultUseCacheForAllThreads() {
- return DEFAULT_USE_CACHE_FOR_ALL_THREADS;
- }
-
- /**
- * Default prefer direct - System Property: io.netty.noPreferDirect - default false
- */
- public static boolean defaultPreferDirect() {
- return PlatformDependent.directBufferPreferred();
- }
-
- /**
- * Default tiny cache size - System Property: io.netty.allocator.tinyCacheSize - default 512
- */
- public static int defaultTinyCacheSize() {
- return DEFAULT_TINY_CACHE_SIZE;
- }
-
- /**
- * Default small cache size - System Property: io.netty.allocator.smallCacheSize - default 256
- */
- public static int defaultSmallCacheSize() {
- return DEFAULT_SMALL_CACHE_SIZE;
- }
-
- /**
- * Default normal cache size - System Property: io.netty.allocator.normalCacheSize - default 64
- */
- public static int defaultNormalCacheSize() {
- return DEFAULT_NORMAL_CACHE_SIZE;
- }
-
- /**
- * Return {@code true} if direct memory cache alignment is supported, {@code false} otherwise.
- */
- public static boolean isDirectMemoryCacheAlignmentSupported() {
- return PlatformDependent.hasUnsafe();
- }
-
- @Override
- public boolean isDirectBufferPooled() {
- return directArenas != null;
- }
-
- /**
- * Returns {@code true} if the calling {@link Thread} has a {@link ThreadLocal} cache for the allocated
- * buffers.
- */
- @Deprecated
- public boolean hasThreadLocalCache() {
- return threadCache.isSet();
- }
-
- /**
- * Free all cached buffers for the calling {@link Thread}.
- */
- @Deprecated
- public void freeThreadLocalCache() {
- threadCache.remove();
- }
-
- final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
- private final boolean useCacheForAllThreads;
-
- PoolThreadLocalCache(boolean useCacheForAllThreads) {
- this.useCacheForAllThreads = useCacheForAllThreads;
- }
-
- @Override
- protected synchronized PoolThreadCache initialValue() {
- final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
- final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
-
- Thread current = Thread.currentThread();
- if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
- return new PoolThreadCache(
- heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
- DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
- }
- // No caching so just use 0 as sizes.
- return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
- }
-
- @Override
- protected void onRemoval(PoolThreadCache threadCache) {
- threadCache.free(false);
- }
-
- private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
- if (arenas == null || arenas.length == 0) {
- return null;
- }
-
- PoolArena<T> minArena = arenas[0];
- for (int i = 1; i < arenas.length; i++) {
- PoolArena<T> arena = arenas[i];
- if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
- minArena = arena;
- }
- }
-
- return minArena;
- }
- }
-
- @Override
- public PooledByteBufAllocatorMetric metric() {
- return metric;
- }
-
- /**
- * Return the number of heap arenas.
- *
- * @deprecated use {@link PooledByteBufAllocatorMetric#numHeapArenas()}.
- */
- @Deprecated
- public int numHeapArenas() {
- return heapArenaMetrics.size();
- }
-
- /**
- * Return the number of direct arenas.
- *
- * @deprecated use {@link PooledByteBufAllocatorMetric#numDirectArenas()}.
- */
- @Deprecated
- public int numDirectArenas() {
- return directArenaMetrics.size();
- }
-
- /**
- * Return a {@link List} of all heap {@link PoolArenaMetric}s that are provided by this pool.
- *
- * @deprecated use {@link PooledByteBufAllocatorMetric#heapArenas()}.
- */
- @Deprecated
- public List<PoolArenaMetric> heapArenas() {
- return heapArenaMetrics;
- }
-
- /**
- * Return a {@link List} of all direct {@link PoolArenaMetric}s that are provided by this pool.
- *
- * @deprecated use {@link PooledByteBufAllocatorMetric#directArenas()}.
- */
- @Deprecated
- public List<PoolArenaMetric> directArenas() {
- return directArenaMetrics;
- }
-
- /**
- * Return the number of thread local caches used by this {@link PooledByteBufAllocator}.
- *
- * @deprecated use {@link PooledByteBufAllocatorMetric#numThreadLocalCaches()}.
- */
- @Deprecated
- public int numThreadLocalCaches() {
- PoolArena<?>[] arenas = heapArenas != null ? heapArenas : directArenas;
- if (arenas == null) {
- return 0;
- }
-
- int total = 0;
- for (PoolArena<?> arena : arenas) {
- total += arena.numThreadCaches.get();
- }
-
- return total;
- }
-
- /**
- * Return the size of the tiny cache.
- *
- * @deprecated use {@link PooledByteBufAllocatorMetric#tinyCacheSize()}.
- */
- @Deprecated
- public int tinyCacheSize() {
- return tinyCacheSize;
- }
-
- /**
- * Return the size of the small cache.
- *
- * @deprecated use {@link PooledByteBufAllocatorMetric#smallCacheSize()}.
- */
- @Deprecated
- public int smallCacheSize() {
- return smallCacheSize;
- }
-
- /**
- * Return the size of the normal cache.
- *
- * @deprecated use {@link PooledByteBufAllocatorMetric#normalCacheSize()}.
- */
- @Deprecated
- public int normalCacheSize() {
- return normalCacheSize;
- }
-
- /**
- * Return the chunk size for an arena.
- *
- * @deprecated use {@link PooledByteBufAllocatorMetric#chunkSize()}.
- */
- @Deprecated
- public final int chunkSize() {
- return chunkSize;
- }
-
- final long usedHeapMemory() {
- return usedMemory(heapArenas);
- }
-
- final long usedDirectMemory() {
- return usedMemory(directArenas);
- }
-
- private static long usedMemory(PoolArena<?>[] arenas) {
- if (arenas == null) {
- return -1;
- }
- long used = 0;
- for (PoolArena<?> arena : arenas) {
- used += arena.numActiveBytes();
- if (used < 0) {
- return Long.MAX_VALUE;
- }
- }
- return used;
- }
-
- final PoolThreadCache threadCache() {
- PoolThreadCache cache = threadCache.get();
- assert cache != null;
- return cache;
- }
-
- /**
- * Returns the status of the allocator (which contains all metrics) as string. Be aware this may be expensive
- * and so should not called too frequently.
- */
- public String dumpStats() {
- int heapArenasLen = heapArenas == null ? 0 : heapArenas.length;
- StringBuilder buf = new StringBuilder(512)
- .append(heapArenasLen)
- .append(" heap arena(s):")
- .append(StringUtil.NEWLINE);
- if (heapArenasLen > 0) {
- for (PoolArena<byte[]> a: heapArenas) {
- buf.append(a);
- }
- }
-
- int directArenasLen = directArenas == null ? 0 : directArenas.length;
-
- buf.append(directArenasLen)
- .append(" direct arena(s):")
- .append(StringUtil.NEWLINE);
- if (directArenasLen > 0) {
- for (PoolArena<ByteBuffer> a: directArenas) {
- buf.append(a);
- }
- }
-
- return buf.toString();
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/datastream/runtime/runners/python/beam/BeamDataStreamPythonStatelessFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/datastream/runtime/runners/python/beam/BeamDataStreamPythonStatelessFunctionRunner.java
index 761d9d8..488d8fa 100644
--- a/flink-python/src/main/java/org/apache/flink/datastream/runtime/runners/python/beam/BeamDataStreamPythonStatelessFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/datastream/runtime/runners/python/beam/BeamDataStreamPythonStatelessFunctionRunner.java
@@ -82,7 +82,7 @@ public class BeamDataStreamPythonStatelessFunctionRunner extends BeamPythonState
.setSpec(
RunnerApi.FunctionSpec.newBuilder()
.setUrn(this.coderUrn)
- .setPayload(org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString.copyFrom(
+ .setPayload(org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.copyFrom(
PythonTypeUtils.TypeInfoToProtoConverter.toTypeInfoProto(builtFieldType).toByteArray()
)).build()
).build();
diff --git a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
index 791c9d4..7da5563 100644
--- a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
+++ b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
@@ -136,7 +136,7 @@ public class PythonOptions {
.defaultValue("python")
.withDescription("Specify the path of the python interpreter used to execute the python " +
"UDF worker. The python UDF worker depends on Python 3.5+, Apache Beam " +
- "(version == 2.19.0), Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). " +
+ "(version == 2.23.0), Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). " +
"Please ensure that the specified environment meets the above requirements. The " +
"option is equivalent to the command line option \"-pyexec\".");
diff --git a/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java b/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java
index dc61cc5..ae36b0a 100644
--- a/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java
+++ b/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java
@@ -106,9 +106,14 @@ public final class FlinkMetricContainer {
private boolean isUserMetric(MetricResult metricResult) {
MetricName metricName = metricResult.getKey().metricName();
- return (metricName instanceof MonitoringInfoMetricName) &&
- ((MonitoringInfoMetricName) metricName).getUrn()
- .contains(MonitoringInfoConstants.Urns.USER_COUNTER);
+ if (metricName instanceof MonitoringInfoMetricName) {
+ String urn = ((MonitoringInfoMetricName) metricName).getUrn();
+ return urn.contains(MonitoringInfoConstants.Urns.USER_SUM_INT64) ||
+ urn.contains(MonitoringInfoConstants.Urns.USER_SUM_DOUBLE) ||
+ urn.contains(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_DOUBLE) ||
+ urn.contains(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64);
+ }
+ return false;
}
private void updateCounterOrMeter(Iterable<MetricResult<Long>> counters) {
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
index 8ebf3da..2516147 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
@@ -44,7 +44,8 @@ import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -227,7 +228,7 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner {
remoteBundle = stageBundleFactory.getBundle(createOutputReceiverFactory(), stateRequestHandler, progressHandler);
mainInputReceiver =
Preconditions.checkNotNull(
- remoteBundle.getInputReceivers().get(MAIN_INPUT_ID),
+ Iterables.getOnlyElement(remoteBundle.getInputReceivers().values()),
"Failed to retrieve main input receiver.");
} catch (Throwable t) {
throw new RuntimeException("Failed to start remote bundle", t);
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonStatelessFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonStatelessFunctionRunner.java
index f97e26b..91acbc8 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonStatelessFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonStatelessFunctionRunner.java
@@ -40,6 +40,8 @@ import org.apache.beam.sdk.util.WindowedValue;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -100,7 +102,7 @@ public abstract class BeamPythonStatelessFunctionRunner extends BeamPythonFuncti
.setSpec(RunnerApi.FunctionSpec.newBuilder()
.setUrn(functionUrn)
.setPayload(
- org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString.copyFrom(
+ org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.copyFrom(
getUserDefinedFunctionsProtoBytes()))
.build())
.putInputs(MAIN_INPUT_NAME, INPUT_ID)
@@ -137,18 +139,28 @@ public abstract class BeamPythonStatelessFunctionRunner extends BeamPythonFuncti
components, createPythonExecutionEnvironment(), input, sideInputs, userStates, timers, transforms, outputs, createValueOnlyWireCoderSetting());
}
- private RunnerApi.WireCoderSetting createValueOnlyWireCoderSetting() throws IOException {
+ private Collection<RunnerApi.ExecutableStagePayload.WireCoderSetting> createValueOnlyWireCoderSetting() throws IOException {
WindowedValue<byte[]> value = WindowedValue.valueInGlobalWindow(new byte[0]);
Coder<? extends BoundedWindow> windowCoder = GlobalWindow.Coder.INSTANCE;
WindowedValue.FullWindowedValueCoder<byte[]> windowedValueCoder =
WindowedValue.FullWindowedValueCoder.of(ByteArrayCoder.of(), windowCoder);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
windowedValueCoder.encode(value, baos);
- return RunnerApi.WireCoderSetting.newBuilder()
- .setUrn(getUrn(RunnerApi.StandardCoders.Enum.PARAM_WINDOWED_VALUE))
- .setPayload(
- org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString.copyFrom(baos.toByteArray()))
- .build();
+
+ return Arrays.asList(
+ RunnerApi.ExecutableStagePayload.WireCoderSetting.newBuilder()
+ .setUrn(getUrn(RunnerApi.StandardCoders.Enum.PARAM_WINDOWED_VALUE))
+ .setPayload(
+ org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.copyFrom(baos.toByteArray()))
+ .setInputOrOutputId(INPUT_ID)
+ .build(),
+ RunnerApi.ExecutableStagePayload.WireCoderSetting.newBuilder()
+ .setUrn(getUrn(RunnerApi.StandardCoders.Enum.PARAM_WINDOWED_VALUE))
+ .setPayload(
+ org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.copyFrom(baos.toByteArray()))
+ .setInputOrOutputId(OUTPUT_ID)
+ .build()
+ );
}
protected abstract byte[] getUserDefinedFunctionsProtoBytes();
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonStatelessFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonStatelessFunctionRunner.java
index fc97b16..91ff931 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonStatelessFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonStatelessFunctionRunner.java
@@ -65,7 +65,7 @@ public class BeamTablePythonStatelessFunctionRunner extends BeamPythonStatelessF
.setSpec(
RunnerApi.FunctionSpec.newBuilder()
.setUrn(coderUrn)
- .setPayload(org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString.copyFrom(
+ .setPayload(org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.copyFrom(
toProtoType(rowType).getRowSchema().toByteArray()))
.build())
.build();
diff --git a/flink-python/src/main/resources/META-INF/NOTICE b/flink-python/src/main/resources/META-INF/NOTICE
index c7c6a96..1409c57 100644
--- a/flink-python/src/main/resources/META-INF/NOTICE
+++ b/flink-python/src/main/resources/META-INF/NOTICE
@@ -16,24 +16,24 @@ This project bundles the following dependencies under the Apache Software Licens
- org.apache.arrow:arrow-format:0.16.0
- org.apache.arrow:arrow-memory:0.16.0
- org.apache.arrow:arrow-vector:0.16.0
-- org.apache.beam:beam-model-fn-execution:2.19.0
-- org.apache.beam:beam-model-job-management:2.19.0
-- org.apache.beam:beam-model-pipeline:2.19.0
-- org.apache.beam:beam-runners-core-construction-java:2.19.0
-- org.apache.beam:beam-runners-core-java:2.19.0
-- org.apache.beam:beam-runners-java-fn-execution:2.19.0
-- org.apache.beam:beam-sdks-java-core:2.19.0
-- org.apache.beam:beam-sdks-java-fn-execution:2.19.0
-- org.apache.beam:beam-vendor-bytebuddy-1_9_3:0.1
-- org.apache.beam:beam-vendor-sdks-java-extensions-protobuf:2.19.0
+- org.apache.beam:beam-model-fn-execution:2.23.0
+- org.apache.beam:beam-model-job-management:2.23.0
+- org.apache.beam:beam-model-pipeline:2.23.0
+- org.apache.beam:beam-runners-core-construction-java:2.23.0
+- org.apache.beam:beam-runners-core-java:2.23.0
+- org.apache.beam:beam-runners-java-fn-execution:2.23.0
+- org.apache.beam:beam-sdks-java-core:2.23.0
+- org.apache.beam:beam-sdks-java-fn-execution:2.23.0
+- org.apache.beam:beam-vendor-bytebuddy-1_10_8:0.1
+- org.apache.beam:beam-vendor-sdks-java-extensions-protobuf:2.23.0
- org.apache.beam:beam-vendor-guava-26_0-jre:0.1
-- org.apache.beam:beam-vendor-grpc-1_21_0:0.1
+- org.apache.beam:beam-vendor-grpc-1_26_0:0.3
This project bundles the following dependencies under the BSD license.
See bundled license files for details
- net.sf.py4j:py4j:0.10.8.1
-- com.google.protobuf:protobuf-java:3.7.1
+- com.google.protobuf:protobuf-java:3.11.1
This project bundles the following dependencies under the MIT license. (https://opensource.org/licenses/MIT)
See bundled license files for details.
@@ -43,35 +43,35 @@ See bundled license files for details.
The bundled Apache Beam dependencies bundle the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
- com.google.api.grpc:proto-google-common-protos:1.12.0
-- com.google.code.gson:gson:2.7
+- com.google.code.gson:gson:2.8.6
- com.google.guava:guava:26.0-jre
-- io.grpc:grpc-auth:1.21.0
-- io.grpc:grpc-core:1.21.0
-- io.grpc:grpc-context:1.21.0
-- io.grpc:grpc-netty:1.21.0
-- io.grpc:grpc-protobuf:1.21.0
-- io.grpc:grpc-stub:1.21.0
-- io.grpc:grpc-testing:1.21.0
-- io.netty:netty-buffer:4.1.34.Final
-- io.netty:netty-codec:4.1.34.Final
-- io.netty:netty-codec-http:4.1.34.Final
-- io.netty:netty-codec-http2:4.1.34.Final
-- io.netty:netty-codec-socks:4.1.34.Final
-- io.netty:netty-common:4.1.34.Final
-- io.netty:netty-handler:4.1.34.Final
-- io.netty:netty-handler-proxy:4.1.34.Final
-- io.netty:netty-resolver:4.1.34.Final
-- io.netty:netty-transport:4.1.34.Final
-- io.netty:netty-transport-native-epoll:4.1.34.Final
-- io.netty:netty-transport-native-unix-common:4.1.34.Final
-- io.netty:netty-tcnative-boringssl-static:2.0.22.Final
-- io.opencensus:opencensus-api:0.21.0
-- io.opencensus:opencensus-contrib-grpc-metrics:0.21.0
-- net.bytebuddy:1.9.3
+- io.grpc:grpc-auth:1.26.0
+- io.grpc:grpc-core:1.26.0
+- io.grpc:grpc-context:1.26.0
+- io.grpc:grpc-netty:1.26.0
+- io.grpc:grpc-protobuf:1.26.0
+- io.grpc:grpc-stub:1.26.0
+- io.grpc:grpc-testing:1.26.0
+- io.netty:netty-buffer:4.1.42.Final
+- io.netty:netty-codec:4.1.42.Final
+- io.netty:netty-codec-http:4.1.42.Final
+- io.netty:netty-codec-http2:4.1.42.Final
+- io.netty:netty-codec-socks:4.1.42.Final
+- io.netty:netty-common:4.1.42.Final
+- io.netty:netty-handler:4.1.42.Final
+- io.netty:netty-handler-proxy:4.1.42.Final
+- io.netty:netty-resolver:4.1.42.Final
+- io.netty:netty-transport:4.1.42.Final
+- io.netty:netty-transport-native-epoll:4.1.42.Final
+- io.netty:netty-transport-native-unix-common:4.1.42.Final
+- io.netty:netty-tcnative-boringssl-static:2.0.26.Final
+- io.opencensus:opencensus-api:0.24.0
+- io.opencensus:opencensus-contrib-grpc-metrics:0.24.0
+- net.bytebuddy:1.10.8
The bundled Apache Beam dependencies bundle the following dependencies under the BSD license.
See bundled license files for details
-- com.google.auth:google-auth-library-credentials:0.13.0
-- com.google.protobuf:protobuf-java:3.7.1
-- com.google.protobuf:protobuf-java-util:3.7.1
+- com.google.auth:google-auth-library-credentials:0.18.0
+- com.google.protobuf:protobuf-java:3.11.0
+- com.google.protobuf:protobuf-java-util:3.11.0
diff --git a/flink-python/src/test/java/org/apache/flink/python/metric/FlinkMetricContainerTest.java b/flink-python/src/test/java/org/apache/flink/python/metric/FlinkMetricContainerTest.java
index f05f1b6..266dab8 100644
--- a/flink-python/src/test/java/org/apache/flink/python/metric/FlinkMetricContainerTest.java
+++ b/flink-python/src/test/java/org/apache/flink/python/metric/FlinkMetricContainerTest.java
@@ -28,9 +28,9 @@ import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.GenericMetricGroup;
import org.apache.flink.runtime.metrics.groups.MetricGroupTest;
-import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
-import org.apache.beam.runners.core.construction.BeamUrns;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.GaugeData;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
import org.apache.beam.sdk.metrics.DistributionResult;
@@ -66,9 +66,6 @@ public class FlinkMetricContainerTest {
private FlinkMetricContainer container;
- private static final String GAUGE_URN =
- BeamUrns.getUrn(MetricsApi.MonitoringInfoTypeUrns.Enum.LATEST_INT64_TYPE);
-
private static final List<String> DEFAULT_SCOPE_COMPONENTS = Arrays.asList(
"key",
"value",
@@ -121,11 +118,11 @@ public class FlinkMetricContainerTest {
MonitoringInfo userMonitoringInfo =
new SimpleMonitoringInfoBuilder()
- .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+ .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, DEFAULT_NAMESPACE)
.setLabel(MonitoringInfoConstants.Labels.NAME, "myCounter")
.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "anyPTransform")
- .setInt64Value(111)
+ .setInt64SumValue(111)
.build();
assertThat(userCounter.getCount(), is(0L));
@@ -142,11 +139,11 @@ public class FlinkMetricContainerTest {
MonitoringInfo userMonitoringInfo =
new SimpleMonitoringInfoBuilder()
- .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+ .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, namespace)
.setLabel(MonitoringInfoConstants.Labels.NAME, "myMeter")
.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "anyPTransform")
- .setInt64Value(111)
+ .setInt64SumValue(111)
.build();
assertThat(userMeter.getCount(), is(0L));
assertThat(userMeter.getRate(), is(0.0));
@@ -159,17 +156,12 @@ public class FlinkMetricContainerTest {
@Test
public void testGaugeMonitoringInfoUpdate() {
- MonitoringInfo userMonitoringInfo = MonitoringInfo.newBuilder()
- .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
- .putLabels(MonitoringInfoConstants.Labels.NAMESPACE, DEFAULT_NAMESPACE)
- .putLabels(MonitoringInfoConstants.Labels.NAME, "myGauge")
- .putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "anyPTransform")
- .setMetric(MetricsApi.Metric
- .newBuilder()
- .setCounterData(
- MetricsApi.CounterData.newBuilder()
- .setInt64Value(111L)))
- .setType(GAUGE_URN)
+ MonitoringInfo userMonitoringInfo = new SimpleMonitoringInfoBuilder()
+ .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
+ .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, DEFAULT_NAMESPACE)
+ .setLabel(MonitoringInfoConstants.Labels.NAME, "myGauge")
+ .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "anyPTransform")
+ .setInt64LatestValue(GaugeData.create(111L))
.build();
container.updateMetrics("step", ImmutableList.of(userMonitoringInfo));
@@ -185,21 +177,12 @@ public class FlinkMetricContainerTest {
@Test
public void testDistributionMonitoringInfoUpdate() {
- MonitoringInfo userMonitoringInfo = MonitoringInfo.newBuilder()
- .setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_COUNTER)
- .putLabels(MonitoringInfoConstants.Labels.NAMESPACE, DEFAULT_NAMESPACE)
- .putLabels(MonitoringInfoConstants.Labels.NAME, "myDistribution")
- .putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "anyPTransform")
- .setMetric(
- MetricsApi.Metric.newBuilder()
- .setDistributionData(
- MetricsApi.DistributionData.newBuilder()
- .setIntDistributionData(
- MetricsApi.IntDistributionData.newBuilder()
- .setSum(30)
- .setCount(10)
- .setMin(1)
- .setMax(5))))
+ MonitoringInfo userMonitoringInfo = new SimpleMonitoringInfoBuilder()
+ .setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64)
+ .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, DEFAULT_NAMESPACE)
+ .setLabel(MonitoringInfoConstants.Labels.NAME, "myDistribution")
+ .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "anyPTransform")
+ .setInt64DistributionValue(DistributionData.create(30, 10, 1, 5))
.build();
container.updateMetrics("step", ImmutableList.of(userMonitoringInfo));
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
index 7541ad2..57dc439 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
@@ -25,7 +25,7 @@ import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonStatele
import org.apache.flink.table.types.logical.RowType;
import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
-import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct;
import java.util.LinkedList;
import java.util.List;
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
index 5625cfb..2fc4264 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
@@ -25,7 +25,7 @@ import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonStatele
import org.apache.flink.table.types.logical.RowType;
import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
-import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct;
import java.util.LinkedList;
import java.util.List;
diff --git a/flink-python/tox.ini b/flink-python/tox.ini
index 62c2b9b..909854b 100644
--- a/flink-python/tox.ini
+++ b/flink-python/tox.ini
@@ -21,14 +21,14 @@
# in multiple virtualenvs. This configuration file will run the
# test suite on all supported python versions.
# new environments will be excluded by default unless explicitly added to envlist.
-envlist = {py35, py36, py37}-cython
+envlist = {py35, py36, py37, py38}-cython
[testenv]
whitelist_externals=
/bin/bash
deps =
pytest
- apache-beam==2.19.0
+ apache-beam==2.23.0
cython==0.29.16
grpcio>=1.17.0,<=1.26.0
grpcio-tools>=1.3.5,<=1.14.2
diff --git a/pom.xml b/pom.xml
index 7a7e586..7afb7a9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -133,8 +133,8 @@ under the License.
<powermock.version>2.0.4</powermock.version>
<hamcrest.version>1.3</hamcrest.version>
<py4j.version>0.10.8.1</py4j.version>
- <beam.version>2.19.0</beam.version>
- <protoc.version>3.7.1</protoc.version>
+ <beam.version>2.23.0</beam.version>
+ <protoc.version>3.11.1</protoc.version>
<arrow.version>0.16.0</arrow.version>
<japicmp.skip>false</japicmp.skip>
<flink.convergence.phase>validate</flink.convergence.phase>