You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2020/09/08 02:06:28 UTC

[flink] branch master updated: [FLINK-19131][python] Add support of Python 3.8 in PyFlink

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c1a12e9  [FLINK-19131][python] Add support of Python 3.8 in PyFlink
c1a12e9 is described below

commit c1a12e925b6ef46ad5cf0e0a5723949572550e9b
Author: sunjincheng121 <su...@gmail.com>
AuthorDate: Fri Sep 4 17:32:37 2020 +0800

    [FLINK-19131][python] Add support of Python 3.8 in PyFlink
    
    This closes #13334.
---
 docs/_includes/generated/python_configuration.html |   2 +-
 docs/dev/python/installation.md                    |   4 +-
 docs/dev/python/installation.zh.md                 |   4 +-
 .../table-api-users-guide/udfs/python_udfs.md      |   2 +-
 .../table-api-users-guide/udfs/python_udfs.zh.md   |   2 +-
 .../udfs/vectorized_python_udfs.md                 |   2 +-
 .../udfs/vectorized_python_udfs.zh.md              |   2 +-
 docs/dev/table/sqlClient.md                        |   2 +-
 docs/dev/table/sqlClient.zh.md                     |   2 +-
 docs/flinkDev/building.md                          |   4 +-
 docs/flinkDev/building.zh.md                       |   4 +-
 docs/ops/cli.md                                    |   8 +-
 docs/ops/cli.zh.md                                 |   8 +-
 .../apache/flink/client/cli/CliFrontendParser.java |   2 +-
 flink-python/README.md                             |   2 +-
 flink-python/dev/dev-requirements.txt              |   2 +-
 flink-python/dev/lint-python.sh                    |   6 +-
 .../datastream/stream_execution_environment.py     |   2 +-
 .../pyflink/fn_execution/beam/beam_boot.py         |  10 +-
 .../fn_execution/beam/beam_operations_fast.pyx     |   7 +-
 .../fn_execution/beam/beam_operations_slow.py      |   7 +-
 .../fn_execution/tests/test_process_mode_boot.py   |  11 -
 flink-python/pyflink/table/table_config.py         |   2 +-
 flink-python/setup.py                              |   9 +-
 .../fnexecution/state/GrpcStateService.java        |   4 +-
 .../grpc/v1p21p0/io/netty/buffer/PoolArena.java    | 818 ---------------------
 .../v1p21p0/io/netty/buffer/PoolThreadCache.java   | 508 -------------
 .../io/netty/buffer/PooledByteBufAllocator.java    | 640 ----------------
 ...eamDataStreamPythonStatelessFunctionRunner.java |   2 +-
 .../org/apache/flink/python/PythonOptions.java     |   2 +-
 .../flink/python/metric/FlinkMetricContainer.java  |  11 +-
 .../python/beam/BeamPythonFunctionRunner.java      |   5 +-
 .../beam/BeamPythonStatelessFunctionRunner.java    |  26 +-
 .../BeamTablePythonStatelessFunctionRunner.java    |   2 +-
 flink-python/src/main/resources/META-INF/NOTICE    |  78 +-
 .../python/metric/FlinkMetricContainerTest.java    |  53 +-
 .../PassThroughPythonScalarFunctionRunner.java     |   2 +-
 .../PassThroughPythonTableFunctionRunner.java      |   2 +-
 flink-python/tox.ini                               |   4 +-
 pom.xml                                            |   4 +-
 40 files changed, 147 insertions(+), 2120 deletions(-)

diff --git a/docs/_includes/generated/python_configuration.html b/docs/_includes/generated/python_configuration.html
index 890d025..00406f1 100644
--- a/docs/_includes/generated/python_configuration.html
+++ b/docs/_includes/generated/python_configuration.html
@@ -24,7 +24,7 @@
             <td><h5>python.executable</h5></td>
             <td style="word-wrap: break-word;">"python"</td>
             <td>String</td>
-            <td>Specify the path of the python interpreter used to execute the python UDF worker. The python UDF worker depends on Python 3.5+, Apache Beam (version == 2.19.0), Pip (version &gt;= 7.1.0) and SetupTools (version &gt;= 37.0.0). Please ensure that the specified environment meets the above requirements. The option is equivalent to the command line option "-pyexec".</td>
+            <td>Specify the path of the python interpreter used to execute the python UDF worker. The python UDF worker depends on Python 3.5+, Apache Beam (version == 2.23.0), Pip (version &gt;= 7.1.0) and SetupTools (version &gt;= 37.0.0). Please ensure that the specified environment meets the above requirements. The option is equivalent to the command line option "-pyexec".</td>
         </tr>
         <tr>
             <td><h5>python.files</h5></td>
diff --git a/docs/dev/python/installation.md b/docs/dev/python/installation.md
index 82c3737..7bbcd74 100644
--- a/docs/dev/python/installation.md
+++ b/docs/dev/python/installation.md
@@ -26,11 +26,11 @@ under the License.
 {:toc}
 
 ## Environment Requirements
-<span class="label label-info">Note</span> Python version (3.5, 3.6 or 3.7) is required for PyFlink. Please run the following command to make sure that it meets the requirements:
+<span class="label label-info">Note</span> Python version (3.5, 3.6, 3.7 or 3.8) is required for PyFlink. Please run the following command to make sure that it meets the requirements:
 
 {% highlight bash %}
 $ python --version
-# the version printed here must be 3.5, 3.6 or 3.7
+# the version printed here must be 3.5, 3.6, 3.7 or 3.8
 {% endhighlight %}
 
 ## Installation of PyFlink
diff --git a/docs/dev/python/installation.zh.md b/docs/dev/python/installation.zh.md
index 3df0dc9..8a0209a 100644
--- a/docs/dev/python/installation.zh.md
+++ b/docs/dev/python/installation.zh.md
@@ -26,11 +26,11 @@ under the License.
 {:toc}
 
 ## 环境要求
-<span class="label label-info">注意</span> PyFlink需要特定的Python版本(3.5, 3.6 或 3.7)。请运行以下命令,以确保Python版本满足要求。
+<span class="label label-info">注意</span> PyFlink需要特定的Python版本(3.5, 3.6, 3.7 或 3.8)。请运行以下命令,以确保Python版本满足要求。
 
 {% highlight bash %}
 $ python --version
-# the version printed here must be 3.5, 3.6 or 3.7
+# the version printed here must be 3.5, 3.6, 3.7 or 3.8
 {% endhighlight %}
 
 ## PyFlink 安装
diff --git a/docs/dev/python/table-api-users-guide/udfs/python_udfs.md b/docs/dev/python/table-api-users-guide/udfs/python_udfs.md
index 84f661e..901ba5a 100644
--- a/docs/dev/python/table-api-users-guide/udfs/python_udfs.md
+++ b/docs/dev/python/table-api-users-guide/udfs/python_udfs.md
@@ -25,7 +25,7 @@ under the License.
 
 User-defined functions are important features, because they significantly extend the expressiveness of Python Table API programs.
 
-**NOTE:** Python UDF execution requires Python version (3.5, 3.6 or 3.7) with PyFlink installed. It's required on both the client side and the cluster side. 
+**NOTE:** Python UDF execution requires Python version (3.5, 3.6, 3.7 or 3.8) with PyFlink installed. It's required on both the client side and the cluster side. 
 
 * This will be replaced by the TOC
 {:toc}
diff --git a/docs/dev/python/table-api-users-guide/udfs/python_udfs.zh.md b/docs/dev/python/table-api-users-guide/udfs/python_udfs.zh.md
index 94ad4f1..eaf38c9 100644
--- a/docs/dev/python/table-api-users-guide/udfs/python_udfs.zh.md
+++ b/docs/dev/python/table-api-users-guide/udfs/python_udfs.zh.md
@@ -25,7 +25,7 @@ under the License.
 
 用户自定义函数是重要的功能,因为它们极大地扩展了Python Table API程序的表达能力。
 
-**注意:** 要执行Python用户自定义函数,客户端和集群端都需要安装Python版本3.5、3.6或3.7,并安装PyFlink。
+**注意:** 要执行Python用户自定义函数,客户端和集群端都需要安装Python版本(3.5、3.6、3.7 或 3.8),并安装PyFlink。
 
 * This will be replaced by the TOC
 {:toc}
diff --git a/docs/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md b/docs/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md
index 8337f69..18c60d0 100644
--- a/docs/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md
+++ b/docs/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md
@@ -30,7 +30,7 @@ These Python libraries are highly optimized and provide high-performance data st
 [non-vectorized user-defined functions]({% link dev/python/table-api-users-guide/udfs/python_udfs.md %}) on how to define vectorized user-defined functions.
 Users only need to add an extra parameter `udf_type="pandas"` in the decorator `udf` to mark it as a vectorized user-defined function.
 
-**NOTE:** Python UDF execution requires Python version (3.5, 3.6 or 3.7) with PyFlink installed. It's required on both the client side and the cluster side. 
+**NOTE:** Python UDF execution requires Python version (3.5, 3.6, 3.7 or 3.8) with PyFlink installed. It's required on both the client side and the cluster side. 
 
 * This will be replaced by the TOC
 {:toc}
diff --git a/docs/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.zh.md b/docs/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.zh.md
index 375d084..a03474f 100644
--- a/docs/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.zh.md
+++ b/docs/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.zh.md
@@ -29,7 +29,7 @@ under the License.
 向量化用户自定义函数的定义,与[非向量化用户自定义函数]({% link dev/python/table-api-users-guide/udfs/python_udfs.zh.md %})具有相似的方式,
 用户只需要在调用`udf`装饰器时添加一个额外的参数`udf_type="pandas"`,将其标记为一个向量化用户自定义函数即可。
 
-**注意:**要执行Python UDF,需要安装PyFlink的Python版本(3.5、3.6或3.7)。客户端和群集端都需要安装它。
+**注意:**要执行Python UDF,需要安装PyFlink的Python版本(3.5、3.6、3.7 或 3.8)。客户端和群集端都需要安装它。
 
 * This will be replaced by the TOC
 {:toc}
diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index 4acecbe..1818728 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -222,7 +222,7 @@ Mode "embedded" submits Flink jobs from the local machine.
                                            --pyExecutable
                                            /usr/local/bin/python3). The python
                                            UDF worker depends on Python 3.5+,
-                                           Apache Beam (version == 2.19.0), Pip
+                                           Apache Beam (version == 2.23.0), Pip
                                            (version >= 7.1.0) and SetupTools
                                            (version >= 37.0.0). Please ensure
                                            that the specified environment meets
diff --git a/docs/dev/table/sqlClient.zh.md b/docs/dev/table/sqlClient.zh.md
index 69df6f0..a25ce3a 100644
--- a/docs/dev/table/sqlClient.zh.md
+++ b/docs/dev/table/sqlClient.zh.md
@@ -223,7 +223,7 @@ Mode "embedded" submits Flink jobs from the local machine.
                                            --pyExecutable
                                            /usr/local/bin/python3). The python
                                            UDF worker depends on Python 3.5+,
-                                           Apache Beam (version == 2.19.0), Pip
+                                           Apache Beam (version == 2.23.0), Pip
                                            (version >= 7.1.0) and SetupTools
                                            (version >= 37.0.0). Please ensure
                                            that the specified environment meets
diff --git a/docs/flinkDev/building.md b/docs/flinkDev/building.md
index 017e5cd..421ff07 100644
--- a/docs/flinkDev/building.md
+++ b/docs/flinkDev/building.md
@@ -64,11 +64,11 @@ mvn clean install -DskipTests -Dfast
 
     If you want to build a PyFlink package that can be used for pip installation, you need to build the Flink project first, as described in [Build Flink](#build-flink).
 
-2. Python version(3.5, 3.6 or 3.7) is required
+2. Python version(3.5, 3.6, 3.7 or 3.8) is required
 
     ```shell
     $ python --version
-    # the version printed here must be 3.5, 3.6 or 3.7
+    # the version printed here must be 3.5, 3.6, 3.7 or 3.8
     ```
 
 3. Build PyFlink with Cython extension support (optional)
diff --git a/docs/flinkDev/building.zh.md b/docs/flinkDev/building.zh.md
index 22ae108..9511b93 100644
--- a/docs/flinkDev/building.zh.md
+++ b/docs/flinkDev/building.zh.md
@@ -65,11 +65,11 @@ mvn clean install -DskipTests -Dfast
 
     如果想构建一个可用于 pip 安装的 PyFlink 包,需要先构建 Flink 工程,如 [构建 Flink](#build-flink) 中所述。
 
-2. Python 的版本为 3.5, 3.6 或者 3.7.
+2. Python 的版本为 3.5, 3.6, 3.7 或者 3.8.
 
     ```shell
     $ python --version
-    # the version printed here must be 3.5, 3.6 or 3.7
+    # the version printed here must be 3.5, 3.6, 3.7 or 3.8
     ```
 
 3. 构建 PyFlink 的 Cython 扩展模块(可选的)
diff --git a/docs/ops/cli.md b/docs/ops/cli.md
index aad9a78..11e7151 100644
--- a/docs/ops/cli.md
+++ b/docs/ops/cli.md
@@ -118,11 +118,11 @@ These examples about how to submit a job in CLI.
 
 <div data-lang="python" markdown="1">
 
-<span class="label label-info">Note</span> When submitting Python job via `flink run`, Flink will run the command “python”. Please run the following command to confirm that the command “python” in current environment points to a specified Python version 3.5, 3.6 or 3.7:
+<span class="label label-info">Note</span> When submitting Python job via `flink run`, Flink will run the command “python”. Please run the following command to confirm that the command “python” in current environment points to a specified Python version 3.5, 3.6, 3.7 or 3.8:
 
 {% highlight bash %}
 $ python --version
-# the version printed here must be 3.5, 3.6 or 3.7
+# the version printed here must be 3.5, 3.6, 3.7 or 3.8
 {% endhighlight %}
 
 -   Run Python Table program:
@@ -374,8 +374,8 @@ Action "run" compiles and runs a program.
                                           UDF worker (e.g.: --pyExecutable
                                           /usr/local/bin/python3). The python
                                           UDF worker depends on a specified Python
-                                          version 3.5, 3.6 or 3.7, Apache Beam
-                                          (version == 2.19.0), Pip (version >= 7.1.0)
+                                          version 3.5, 3.6 3.7 or 3.8, Apache Beam
+                                          (version == 2.23.0), Pip (version >= 7.1.0)
                                           and SetupTools (version >= 37.0.0).
                                           Please ensure that the specified environment
                                           meets the above requirements.
diff --git a/docs/ops/cli.zh.md b/docs/ops/cli.zh.md
index 5739227..81b1307 100644
--- a/docs/ops/cli.zh.md
+++ b/docs/ops/cli.zh.md
@@ -118,11 +118,11 @@ option.
 
 <div data-lang="python" markdown="1">
 
-<span class="label label-info">注意</span> 通过`flink run`提交Python任务时Flink会调用“python”命令。请执行以下命令以确认当前环境下的指令“python”指向Python的版本为3.5, 3.6 或者 3.7中的一个:
+<span class="label label-info">注意</span> 通过`flink run`提交Python任务时Flink会调用“python”命令。请执行以下命令以确认当前环境下的指令“python”指向Python的版本为3.5、3.6、3.7 或 3.8 中的一个:
 
 {% highlight bash %}
 $ python --version
-# the version printed here must be 3.5, 3.6 or 3.7
+# the version printed here must be 3.5, 3.6, 3.7 or 3.8
 {% endhighlight %}
 
 -   提交一个Python Table的作业:
@@ -373,8 +373,8 @@ Action "run" compiles and runs a program.
                                           UDF worker (e.g.: --pyExecutable
                                           /usr/local/bin/python3). The python
                                           UDF worker depends on a specified Python
-                                          version 3.5, 3.6 or 3.7, Apache Beam
-                                          (version == 2.19.0), Pip (version >= 7.1.0)
+                                          version 3.5, 3.6 3.7 or 3.8, Apache Beam
+                                          (version == 2.23.0), Pip (version >= 7.1.0)
                                           and SetupTools (version >= 37.0.0).
                                           Please ensure that the specified environment
                                           meets the above requirements.
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 10f79ad..0d82334 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -154,7 +154,7 @@ public class CliFrontendParser {
 	public static final Option PYEXEC_OPTION = new Option("pyexec", "pyExecutable", true,
 		"Specify the path of the python interpreter used to execute the python UDF worker " +
 			"(e.g.: --pyExecutable /usr/local/bin/python3). " +
-			"The python UDF worker depends on Python 3.5+, Apache Beam (version == 2.19.0), " +
+			"The python UDF worker depends on Python 3.5+, Apache Beam (version == 2.23.0), " +
 			"Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). " +
 			"Please ensure that the specified environment meets the above requirements.");
 
diff --git a/flink-python/README.md b/flink-python/README.md
index ea7bb16..ec87028 100644
--- a/flink-python/README.md
+++ b/flink-python/README.md
@@ -16,7 +16,7 @@ The auto-generated Python docs can be found at [https://ci.apache.org/projects/f
 
 ## Python Requirements
 
-Apache Flink Python API depends on Py4J (currently version 0.10.8.1), CloudPickle (currently version 1.2.2), python-dateutil(currently version 2.8.0), Apache Beam (currently version 2.19.0) and jsonpickle (currently 1.2).
+Apache Flink Python API depends on Py4J (currently version 0.10.8.1), CloudPickle (currently version 1.2.2), python-dateutil(currently version 2.8.0), Apache Beam (currently version 2.23.0) and jsonpickle (currently 1.2).
 
 ## Development Notices
 
diff --git a/flink-python/dev/dev-requirements.txt b/flink-python/dev/dev-requirements.txt
index 5da68b2..08faa45 100755
--- a/flink-python/dev/dev-requirements.txt
+++ b/flink-python/dev/dev-requirements.txt
@@ -14,5 +14,5 @@
 # limitations under the License.
 setuptools>=18.0
 wheel
-apache-beam==2.19.0
+apache-beam==2.23.0
 cython==0.29.16
diff --git a/flink-python/dev/lint-python.sh b/flink-python/dev/lint-python.sh
index cb65ff8..d7ccbab 100755
--- a/flink-python/dev/lint-python.sh
+++ b/flink-python/dev/lint-python.sh
@@ -219,7 +219,7 @@ function install_miniconda() {
 
 # Install some kinds of py env.
 function install_py_env() {
-    py_env=("3.5" "3.6" "3.7")
+    py_env=("3.5" "3.6" "3.7" "3.8")
     for ((i=0;i<${#py_env[@]};i++)) do
         if [ -d "$CURRENT_DIR/.conda/envs/${py_env[i]}" ]; then
             rm -rf "$CURRENT_DIR/.conda/envs/${py_env[i]}"
@@ -357,7 +357,7 @@ function install_environment() {
     print_function "STEP" "install miniconda... [SUCCESS]"
 
     # step-3 install python environment whcih includes
-    # 3.5 3.6 3.7
+    # 3.5 3.6 3.7 3.8
     if [ $STEP -lt 3 ] && [ `need_install_component "py_env"` = true ]; then
         print_function "STEP" "installing python environment..."
         install_py_env
@@ -696,7 +696,7 @@ usage: $0 [options]
 -l          list all checks supported.
 Examples:
   ./lint-python -s basic        =>  install environment with basic components.
-  ./lint-python -s py_env       =>  install environment with python env(3.5,3.6,3.7).
+  ./lint-python -s py_env       =>  install environment with python env(3.5,3.6,3.7,3.8).
   ./lint-python -s all          =>  install environment with all components such as python env,tox,flake8,sphinx etc.
   ./lint-python -s tox,flake8   =>  install environment with tox,flake8.
   ./lint-python -s tox -f       =>  reinstall environment with tox.
diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py b/flink-python/pyflink/datastream/stream_execution_environment.py
index 099f0ef..c25a7db 100644
--- a/flink-python/pyflink/datastream/stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/stream_execution_environment.py
@@ -543,7 +543,7 @@ class StreamExecutionEnvironment(object):
 
         .. note::
 
-            The python udf worker depends on Apache Beam (version == 2.19.0).
+            The python udf worker depends on Apache Beam (version == 2.23.0).
             Please ensure that the specified environment meets the above requirements.
 
         :param python_exec: The path of python interpreter.
diff --git a/flink-python/pyflink/fn_execution/beam/beam_boot.py b/flink-python/pyflink/fn_execution/beam/beam_boot.py
index f460ae6..b0c42dc 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_boot.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_boot.py
@@ -58,27 +58,19 @@ if __name__ == "__main__":
     parser = argparse.ArgumentParser()
 
     parser.add_argument("--id", default="", help="Local identifier (required).")
-    parser.add_argument("--logging_endpoint", default="",
-                        help="Logging endpoint (required).")
     parser.add_argument("--provision_endpoint", default="",
                         help="Provision endpoint (required).")
-    parser.add_argument("--control_endpoint", default="",
-                        help="Control endpoint (required).")
     parser.add_argument("--semi_persist_dir", default="/tmp",
                         help="Local semi-persistent directory (optional).")
 
     args = parser.parse_known_args()[0]
 
     worker_id = args.id
-    logging_endpoint = args.logging_endpoint
     provision_endpoint = args.provision_endpoint
-    control_endpoint = args.control_endpoint
     semi_persist_dir = args.semi_persist_dir
 
     check_not_empty(worker_id, "No id provided.")
-    check_not_empty(logging_endpoint, "No logging endpoint provided.")
     check_not_empty(provision_endpoint, "No provision endpoint provided.")
-    check_not_empty(control_endpoint, "No control endpoint provided.")
 
     logging.info("Initializing python harness: %s" % " ".join(sys.argv))
 
@@ -89,6 +81,8 @@ if __name__ == "__main__":
         client = ProvisionServiceStub(channel=channel)
         info = client.GetProvisionInfo(GetProvisionInfoRequest(), metadata=metadata).info
         options = json_format.MessageToJson(info.pipeline_options)
+        logging_endpoint = info.logging_endpoint.url
+        control_endpoint = info.control_endpoint.url
 
     os.environ["WORKER_ID"] = worker_id
     os.environ["PIPELINE_OPTIONS"] = options
diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx b/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
index 8e72376..093ca2e 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
+++ b/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
@@ -108,8 +108,11 @@ cdef class BeamStatelessFunctionOperation(Operation):
             str(tag)] = receiver.opcounter.element_counter.value()
         return metrics
 
-    cpdef monitoring_infos(self, transform_id):
-        # only pass user metric to Java
+    cpdef monitoring_infos(self, transform_id, tag_to_pcollection_id):
+        """
+        Only pass user metric to Java
+        :param tag_to_pcollection_id: useless for user metric
+        """
         return self.user_monitoring_infos(transform_id)
 
     cdef void _update_gauge(self, base_metric_group):
diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py b/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
index 4628f3a..9906758 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
@@ -85,8 +85,11 @@ class StatelessFunctionOperation(Operation):
             self._value_coder_impl.encode_to_stream(self.func(o.value), output_stream, True)
             output_stream.maybe_flush()
 
-    def monitoring_infos(self, transform_id):
-        # only pass user metric to Java
+    def monitoring_infos(self, transform_id, tag_to_pcollection_id):
+        """
+        Only pass user metric to Java
+        :param tag_to_pcollection_id: useless for user metric
+        """
         return super().user_monitoring_infos(transform_id)
 
     def generate_func(self, udfs) -> tuple:
diff --git a/flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py b/flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py
index af80ed4..402b09a 100644
--- a/flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py
+++ b/flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py
@@ -101,19 +101,8 @@ class PythonBootTests(PyFlinkTestCase):
 
         args = [self.runner_path, "--id", "1"]
         exit_message = subprocess.check_output(args, env=self.env).decode("utf-8")
-        self.assertIn("No logging endpoint provided.", exit_message)
-
-        args = [self.runner_path, "--id", "1",
-                "--logging_endpoint", "localhost:0000"]
-        exit_message = subprocess.check_output(args, env=self.env).decode("utf-8")
         self.assertIn("No provision endpoint provided.", exit_message)
 
-        args = [self.runner_path, "--id", "1",
-                "--logging_endpoint", "localhost:0000",
-                "--provision_endpoint", "localhost:%d" % self.provision_port]
-        exit_message = subprocess.check_output(args, env=self.env).decode("utf-8")
-        self.assertIn("No control endpoint provided.", exit_message)
-
     def test_set_working_directory(self):
         JProcessPythonEnvironmentManager = \
             get_gateway().jvm.org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager
diff --git a/flink-python/pyflink/table/table_config.py b/flink-python/pyflink/table/table_config.py
index 6e0b738..14aab37 100644
--- a/flink-python/pyflink/table/table_config.py
+++ b/flink-python/pyflink/table/table_config.py
@@ -366,7 +366,7 @@ class TableConfig(object):
 
         .. note::
 
-            The python udf worker depends on Apache Beam (version == 2.19.0).
+            The python udf worker depends on Apache Beam (version == 2.23.0).
             Please ensure that the specified environment meets the above requirements.
 
         :param python_exec: The path of python interpreter.
diff --git a/flink-python/setup.py b/flink-python/setup.py
index 3ae7258..176e2ac8 100644
--- a/flink-python/setup.py
+++ b/flink-python/setup.py
@@ -314,9 +314,11 @@ run sdist.
         author='Apache Software Foundation',
         author_email='dev@flink.apache.org',
         python_requires='>=3.5',
-        install_requires=['py4j==0.10.8.1', 'python-dateutil==2.8.0', 'apache-beam==2.19.0',
+        install_requires=['py4j==0.10.8.1', 'python-dateutil==2.8.0', 'apache-beam==2.23.0',
                           'cloudpickle==1.2.2', 'avro-python3>=1.8.1,<=1.9.1', 'jsonpickle==1.2',
-                          'pandas>=0.23.4,<=0.25.3', 'pyarrow>=0.15.1,<0.16.0', 'pytz>=2018.3'],
+                          'pandas>=0.24.2,<1; python_full_version < "3.5.3"',
+                          'pandas>=0.25.2,<1; python_full_version >= "3.5.3"',
+                          'pyarrow>=0.15.1,<0.18.0', 'pytz>=2018.3'],
         cmdclass={'build_ext': build_ext},
         tests_require=['pytest==4.4.1'],
         description='Apache Flink Python API',
@@ -328,7 +330,8 @@ run sdist.
             'License :: OSI Approved :: Apache Software License',
             'Programming Language :: Python :: 3.5',
             'Programming Language :: Python :: 3.6',
-            'Programming Language :: Python :: 3.7'],
+            'Programming Language :: Python :: 3.7',
+            'Programming Language :: Python :: 3.8'],
         ext_modules=extensions
     )
 finally:
diff --git a/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java b/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
index ed34e34..113748b 100644
--- a/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
+++ b/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
@@ -28,8 +28,8 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
 import org.apache.beam.model.fnexecution.v1.BeamFnStateGrpc;
 import org.apache.beam.runners.fnexecution.FnService;
-import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCallStreamObserver;
-import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCallStreamObserver;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
 
 /** An implementation of the Beam Fn State service. */
 public class GrpcStateService extends BeamFnStateGrpc.BeamFnStateImplBase
diff --git a/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PoolArena.java b/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PoolArena.java
deleted file mode 100644
index c573759..0000000
--- a/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PoolArena.java
+++ /dev/null
@@ -1,818 +0,0 @@
-/*
- * Copyright 2012 The Netty Project
- *
- * The Netty Project licenses this file to you under the Apache License,
- * version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at:
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.beam.vendor.grpc.v1p21p0.io.netty.buffer;
-
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.LongCounter;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.PlatformDependent;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.StringUtil;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
-import static java.lang.Math.max;
-
-// This class is copied from Netty's io.netty.buffer.PoolArena,
-// can be removed after Beam bumps its shaded netty version to 1.22+ (BEAM-9030).
-//
-// Changed lines: 284, 295, 297~300
-
-abstract class PoolArena<T> implements PoolArenaMetric {
-	static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();
-
-	enum SizeClass {
-		Tiny,
-		Small,
-		Normal
-	}
-
-	static final int numTinySubpagePools = 512 >>> 4;
-
-	final PooledByteBufAllocator parent;
-
-	private final int maxOrder;
-	final int pageSize;
-	final int pageShifts;
-	final int chunkSize;
-	final int subpageOverflowMask;
-	final int numSmallSubpagePools;
-	final int directMemoryCacheAlignment;
-	final int directMemoryCacheAlignmentMask;
-	private final PoolSubpage<T>[] tinySubpagePools;
-	private final PoolSubpage<T>[] smallSubpagePools;
-
-	private final PoolChunkList<T> q050;
-	private final PoolChunkList<T> q025;
-	private final PoolChunkList<T> q000;
-	private final PoolChunkList<T> qInit;
-	private final PoolChunkList<T> q075;
-	private final PoolChunkList<T> q100;
-
-	private final List<PoolChunkListMetric> chunkListMetrics;
-
-	// Metrics for allocations and deallocations
-	private long allocationsNormal;
-	// We need to use the LongCounter here as this is not guarded via synchronized block.
-	private final LongCounter allocationsTiny = PlatformDependent.newLongCounter();
-	private final LongCounter allocationsSmall = PlatformDependent.newLongCounter();
-	private final LongCounter allocationsHuge = PlatformDependent.newLongCounter();
-	private final LongCounter activeBytesHuge = PlatformDependent.newLongCounter();
-
-	private long deallocationsTiny;
-	private long deallocationsSmall;
-	private long deallocationsNormal;
-
-	// We need to use the LongCounter here as this is not guarded via synchronized block.
-	private final LongCounter deallocationsHuge = PlatformDependent.newLongCounter();
-
-	// Number of thread caches backed by this arena.
-	final AtomicInteger numThreadCaches = new AtomicInteger();
-
-	// TODO: Test if adding padding helps under contention
-	//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
-
-	protected PoolArena(PooledByteBufAllocator parent, int pageSize,
-						int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) {
-		this.parent = parent;
-		this.pageSize = pageSize;
-		this.maxOrder = maxOrder;
-		this.pageShifts = pageShifts;
-		this.chunkSize = chunkSize;
-		directMemoryCacheAlignment = cacheAlignment;
-		directMemoryCacheAlignmentMask = cacheAlignment - 1;
-		subpageOverflowMask = ~(pageSize - 1);
-		tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
-		for (int i = 0; i < tinySubpagePools.length; i ++) {
-			tinySubpagePools[i] = newSubpagePoolHead(pageSize);
-		}
-
-		numSmallSubpagePools = pageShifts - 9;
-		smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
-		for (int i = 0; i < smallSubpagePools.length; i ++) {
-			smallSubpagePools[i] = newSubpagePoolHead(pageSize);
-		}
-
-		q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
-		q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
-		q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
-		q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
-		q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
-		qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);
-
-		q100.prevList(q075);
-		q075.prevList(q050);
-		q050.prevList(q025);
-		q025.prevList(q000);
-		q000.prevList(null);
-		qInit.prevList(qInit);
-
-		List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);
-		metrics.add(qInit);
-		metrics.add(q000);
-		metrics.add(q025);
-		metrics.add(q050);
-		metrics.add(q075);
-		metrics.add(q100);
-		chunkListMetrics = Collections.unmodifiableList(metrics);
-	}
-
-	private PoolSubpage<T> newSubpagePoolHead(int pageSize) {
-		PoolSubpage<T> head = new PoolSubpage<T>(pageSize);
-		head.prev = head;
-		head.next = head;
-		return head;
-	}
-
-	@SuppressWarnings("unchecked")
-	private PoolSubpage<T>[] newSubpagePoolArray(int size) {
-		return new PoolSubpage[size];
-	}
-
-	abstract boolean isDirect();
-
-	PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
-		PooledByteBuf<T> buf = newByteBuf(maxCapacity);
-		allocate(cache, buf, reqCapacity);
-		return buf;
-	}
-
-	static int tinyIdx(int normCapacity) {
-		return normCapacity >>> 4;
-	}
-
-	static int smallIdx(int normCapacity) {
-		int tableIdx = 0;
-		int i = normCapacity >>> 10;
-		while (i != 0) {
-			i >>>= 1;
-			tableIdx ++;
-		}
-		return tableIdx;
-	}
-
-	// capacity < pageSize
-	boolean isTinyOrSmall(int normCapacity) {
-		return (normCapacity & subpageOverflowMask) == 0;
-	}
-
-	// normCapacity < 512
-	static boolean isTiny(int normCapacity) {
-		return (normCapacity & 0xFFFFFE00) == 0;
-	}
-
-	private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
-		final int normCapacity = normalizeCapacity(reqCapacity);
-		if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
-			int tableIdx;
-			PoolSubpage<T>[] table;
-			boolean tiny = isTiny(normCapacity);
-			if (tiny) { // < 512
-				if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
-					// was able to allocate out of the cache so move on
-					return;
-				}
-				tableIdx = tinyIdx(normCapacity);
-				table = tinySubpagePools;
-			} else {
-				if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
-					// was able to allocate out of the cache so move on
-					return;
-				}
-				tableIdx = smallIdx(normCapacity);
-				table = smallSubpagePools;
-			}
-
-			final PoolSubpage<T> head = table[tableIdx];
-
-			/**
-			 * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
-			 * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
-			 */
-			synchronized (head) {
-				final PoolSubpage<T> s = head.next;
-				if (s != head) {
-					assert s.doNotDestroy && s.elemSize == normCapacity;
-					long handle = s.allocate();
-					assert handle >= 0;
-					s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
-					incTinySmallAllocation(tiny);
-					return;
-				}
-			}
-			synchronized (this) {
-				allocateNormal(buf, reqCapacity, normCapacity);
-			}
-
-			incTinySmallAllocation(tiny);
-			return;
-		}
-		if (normCapacity <= chunkSize) {
-			if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
-				// was able to allocate out of the cache so move on
-				return;
-			}
-			synchronized (this) {
-				allocateNormal(buf, reqCapacity, normCapacity);
-				++allocationsNormal;
-			}
-		} else {
-			// Huge allocations are never served via the cache so just call allocateHuge
-			allocateHuge(buf, reqCapacity);
-		}
-	}
-
-	// Method must be called inside synchronized(this) { ... } block
-	private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
-		if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
-			q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
-			q075.allocate(buf, reqCapacity, normCapacity)) {
-			return;
-		}
-
-		// Add a new chunk.
-		PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
-		boolean success = c.allocate(buf, reqCapacity, normCapacity);
-		assert success;
-		qInit.add(c);
-	}
-
-	private void incTinySmallAllocation(boolean tiny) {
-		if (tiny) {
-			allocationsTiny.increment();
-		} else {
-			allocationsSmall.increment();
-		}
-	}
-
-	private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
-		PoolChunk<T> chunk = newUnpooledChunk(reqCapacity);
-		activeBytesHuge.add(chunk.chunkSize());
-		buf.initUnpooled(chunk, reqCapacity);
-		allocationsHuge.increment();
-	}
-
-	void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
-		if (chunk.unpooled) {
-			int size = chunk.chunkSize();
-			destroyChunk(chunk);
-			activeBytesHuge.add(-size);
-			deallocationsHuge.increment();
-		} else {
-			SizeClass sizeClass = sizeClass(normCapacity);
-			if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
-				// cached so not free it.
-				return;
-			}
-
-			freeChunk(chunk, handle, sizeClass, nioBuffer, false);
-		}
-	}
-
-	private SizeClass sizeClass(int normCapacity) {
-		if (!isTinyOrSmall(normCapacity)) {
-			return SizeClass.Normal;
-		}
-		return isTiny(normCapacity) ? SizeClass.Tiny : SizeClass.Small;
-	}
-
-	void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass, ByteBuffer nioBuffer, boolean finalizer) {
-		final boolean destroyChunk;
-		synchronized (this) {
-			// We only call this if freeChunk is not called because of the PoolThreadCache finalizer as otherwise this
-			// may fail due lazy class-loading in for example tomcat.
-			if (!finalizer) {
-				switch (sizeClass) {
-					case Normal:
-						++deallocationsNormal;
-						break;
-					case Small:
-						++deallocationsSmall;
-						break;
-					case Tiny:
-						++deallocationsTiny;
-						break;
-					default:
-						throw new Error();
-				}
-			}
-			destroyChunk = !chunk.parent.free(chunk, handle, nioBuffer);
-		}
-		if (destroyChunk) {
-			// destroyChunk not need to be called while holding the synchronized lock.
-			destroyChunk(chunk);
-		}
-	}
-
-	PoolSubpage<T> findSubpagePoolHead(int elemSize) {
-		int tableIdx;
-		PoolSubpage<T>[] table;
-		if (isTiny(elemSize)) { // < 512
-			tableIdx = elemSize >>> 4;
-			table = tinySubpagePools;
-		} else {
-			tableIdx = 0;
-			elemSize >>>= 10;
-			while (elemSize != 0) {
-				elemSize >>>= 1;
-				tableIdx ++;
-			}
-			table = smallSubpagePools;
-		}
-
-		return table[tableIdx];
-	}
-
-	int normalizeCapacity(int reqCapacity) {
-		checkPositiveOrZero(reqCapacity, "reqCapacity");
-
-		if (reqCapacity >= chunkSize) {
-			return directMemoryCacheAlignment == 0 ? reqCapacity : alignCapacity(reqCapacity);
-		}
-
-		if (!isTiny(reqCapacity)) { // >= 512
-			// Doubled
-
-			int normalizedCapacity = reqCapacity;
-			normalizedCapacity --;
-			normalizedCapacity |= normalizedCapacity >>>  1;
-			normalizedCapacity |= normalizedCapacity >>>  2;
-			normalizedCapacity |= normalizedCapacity >>>  4;
-			normalizedCapacity |= normalizedCapacity >>>  8;
-			normalizedCapacity |= normalizedCapacity >>> 16;
-			normalizedCapacity ++;
-
-			if (normalizedCapacity < 0) {
-				normalizedCapacity >>>= 1;
-			}
-			assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0;
-
-			return normalizedCapacity;
-		}
-
-		if (directMemoryCacheAlignment > 0) {
-			return alignCapacity(reqCapacity);
-		}
-
-		// Quantum-spaced
-		if ((reqCapacity & 15) == 0) {
-			return reqCapacity;
-		}
-
-		return (reqCapacity & ~15) + 16;
-	}
-
-	int alignCapacity(int reqCapacity) {
-		int delta = reqCapacity & directMemoryCacheAlignmentMask;
-		return delta == 0 ? reqCapacity : reqCapacity + directMemoryCacheAlignment - delta;
-	}
-
-	void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
-		if (newCapacity < 0 || newCapacity > buf.maxCapacity()) {
-			throw new IllegalArgumentException("newCapacity: " + newCapacity);
-		}
-
-		int oldCapacity = buf.length;
-		if (oldCapacity == newCapacity) {
-			return;
-		}
-
-		PoolChunk<T> oldChunk = buf.chunk;
-		ByteBuffer oldNioBuffer = buf.tmpNioBuf;
-		long oldHandle = buf.handle;
-		T oldMemory = buf.memory;
-		int oldOffset = buf.offset;
-		int oldMaxLength = buf.maxLength;
-		int readerIndex = buf.readerIndex();
-		int writerIndex = buf.writerIndex();
-
-		allocate(parent.threadCache(), buf, newCapacity);
-		if (newCapacity > oldCapacity) {
-			memoryCopy(
-				oldMemory, oldOffset,
-				buf.memory, buf.offset, oldCapacity);
-		} else if (newCapacity < oldCapacity) {
-			if (readerIndex < newCapacity) {
-				if (writerIndex > newCapacity) {
-					writerIndex = newCapacity;
-				}
-				memoryCopy(
-					oldMemory, oldOffset + readerIndex,
-					buf.memory, buf.offset + readerIndex, writerIndex - readerIndex);
-			} else {
-				readerIndex = writerIndex = newCapacity;
-			}
-		}
-
-		buf.setIndex(readerIndex, writerIndex);
-
-		if (freeOldMemory) {
-			free(oldChunk, oldNioBuffer, oldHandle, oldMaxLength, buf.cache);
-		}
-	}
-
-	@Override
-	public int numThreadCaches() {
-		return numThreadCaches.get();
-	}
-
-	@Override
-	public int numTinySubpages() {
-		return tinySubpagePools.length;
-	}
-
-	@Override
-	public int numSmallSubpages() {
-		return smallSubpagePools.length;
-	}
-
-	@Override
-	public int numChunkLists() {
-		return chunkListMetrics.size();
-	}
-
-	@Override
-	public List<PoolSubpageMetric> tinySubpages() {
-		return subPageMetricList(tinySubpagePools);
-	}
-
-	@Override
-	public List<PoolSubpageMetric> smallSubpages() {
-		return subPageMetricList(smallSubpagePools);
-	}
-
-	@Override
-	public List<PoolChunkListMetric> chunkLists() {
-		return chunkListMetrics;
-	}
-
-	private static List<PoolSubpageMetric> subPageMetricList(PoolSubpage<?>[] pages) {
-		List<PoolSubpageMetric> metrics = new ArrayList<PoolSubpageMetric>();
-		for (PoolSubpage<?> head : pages) {
-			if (head.next == head) {
-				continue;
-			}
-			PoolSubpage<?> s = head.next;
-			for (;;) {
-				metrics.add(s);
-				s = s.next;
-				if (s == head) {
-					break;
-				}
-			}
-		}
-		return metrics;
-	}
-
-	@Override
-	public long numAllocations() {
-		final long allocsNormal;
-		synchronized (this) {
-			allocsNormal = allocationsNormal;
-		}
-		return allocationsTiny.value() + allocationsSmall.value() + allocsNormal + allocationsHuge.value();
-	}
-
-	@Override
-	public long numTinyAllocations() {
-		return allocationsTiny.value();
-	}
-
-	@Override
-	public long numSmallAllocations() {
-		return allocationsSmall.value();
-	}
-
-	@Override
-	public synchronized long numNormalAllocations() {
-		return allocationsNormal;
-	}
-
-	@Override
-	public long numDeallocations() {
-		final long deallocs;
-		synchronized (this) {
-			deallocs = deallocationsTiny + deallocationsSmall + deallocationsNormal;
-		}
-		return deallocs + deallocationsHuge.value();
-	}
-
-	@Override
-	public synchronized long numTinyDeallocations() {
-		return deallocationsTiny;
-	}
-
-	@Override
-	public synchronized long numSmallDeallocations() {
-		return deallocationsSmall;
-	}
-
-	@Override
-	public synchronized long numNormalDeallocations() {
-		return deallocationsNormal;
-	}
-
-	@Override
-	public long numHugeAllocations() {
-		return allocationsHuge.value();
-	}
-
-	@Override
-	public long numHugeDeallocations() {
-		return deallocationsHuge.value();
-	}
-
-	@Override
-	public  long numActiveAllocations() {
-		long val = allocationsTiny.value() + allocationsSmall.value() + allocationsHuge.value()
-			- deallocationsHuge.value();
-		synchronized (this) {
-			val += allocationsNormal - (deallocationsTiny + deallocationsSmall + deallocationsNormal);
-		}
-		return max(val, 0);
-	}
-
-	@Override
-	public long numActiveTinyAllocations() {
-		return max(numTinyAllocations() - numTinyDeallocations(), 0);
-	}
-
-	@Override
-	public long numActiveSmallAllocations() {
-		return max(numSmallAllocations() - numSmallDeallocations(), 0);
-	}
-
-	@Override
-	public long numActiveNormalAllocations() {
-		final long val;
-		synchronized (this) {
-			val = allocationsNormal - deallocationsNormal;
-		}
-		return max(val, 0);
-	}
-
-	@Override
-	public long numActiveHugeAllocations() {
-		return max(numHugeAllocations() - numHugeDeallocations(), 0);
-	}
-
-	@Override
-	public long numActiveBytes() {
-		long val = activeBytesHuge.value();
-		synchronized (this) {
-			for (int i = 0; i < chunkListMetrics.size(); i++) {
-				for (PoolChunkMetric m: chunkListMetrics.get(i)) {
-					val += m.chunkSize();
-				}
-			}
-		}
-		return max(0, val);
-	}
-
-	protected abstract PoolChunk<T> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize);
-	protected abstract PoolChunk<T> newUnpooledChunk(int capacity);
-	protected abstract PooledByteBuf<T> newByteBuf(int maxCapacity);
-	protected abstract void memoryCopy(T src, int srcOffset, T dst, int dstOffset, int length);
-	protected abstract void destroyChunk(PoolChunk<T> chunk);
-
-	@Override
-	public synchronized String toString() {
-		StringBuilder buf = new StringBuilder()
-			.append("Chunk(s) at 0~25%:")
-			.append(StringUtil.NEWLINE)
-			.append(qInit)
-			.append(StringUtil.NEWLINE)
-			.append("Chunk(s) at 0~50%:")
-			.append(StringUtil.NEWLINE)
-			.append(q000)
-			.append(StringUtil.NEWLINE)
-			.append("Chunk(s) at 25~75%:")
-			.append(StringUtil.NEWLINE)
-			.append(q025)
-			.append(StringUtil.NEWLINE)
-			.append("Chunk(s) at 50~100%:")
-			.append(StringUtil.NEWLINE)
-			.append(q050)
-			.append(StringUtil.NEWLINE)
-			.append("Chunk(s) at 75~100%:")
-			.append(StringUtil.NEWLINE)
-			.append(q075)
-			.append(StringUtil.NEWLINE)
-			.append("Chunk(s) at 100%:")
-			.append(StringUtil.NEWLINE)
-			.append(q100)
-			.append(StringUtil.NEWLINE)
-			.append("tiny subpages:");
-		appendPoolSubPages(buf, tinySubpagePools);
-		buf.append(StringUtil.NEWLINE)
-			.append("small subpages:");
-		appendPoolSubPages(buf, smallSubpagePools);
-		buf.append(StringUtil.NEWLINE);
-
-		return buf.toString();
-	}
-
-	private static void appendPoolSubPages(StringBuilder buf, PoolSubpage<?>[] subpages) {
-		for (int i = 0; i < subpages.length; i ++) {
-			PoolSubpage<?> head = subpages[i];
-			if (head.next == head) {
-				continue;
-			}
-
-			buf.append(StringUtil.NEWLINE)
-				.append(i)
-				.append(": ");
-			PoolSubpage<?> s = head.next;
-			for (;;) {
-				buf.append(s);
-				s = s.next;
-				if (s == head) {
-					break;
-				}
-			}
-		}
-	}
-
-	@Override
-	protected final void finalize() throws Throwable {
-		try {
-			super.finalize();
-		} finally {
-			destroyPoolSubPages(smallSubpagePools);
-			destroyPoolSubPages(tinySubpagePools);
-			destroyPoolChunkLists(qInit, q000, q025, q050, q075, q100);
-		}
-	}
-
-	private static void destroyPoolSubPages(PoolSubpage<?>[] pages) {
-		for (PoolSubpage<?> page : pages) {
-			page.destroy();
-		}
-	}
-
-	private void destroyPoolChunkLists(PoolChunkList<T>... chunkLists) {
-		for (PoolChunkList<T> chunkList: chunkLists) {
-			chunkList.destroy(this);
-		}
-	}
-
-	static final class HeapArena extends PoolArena<byte[]> {
-
-		HeapArena(PooledByteBufAllocator parent, int pageSize, int maxOrder,
-				  int pageShifts, int chunkSize, int directMemoryCacheAlignment) {
-			super(parent, pageSize, maxOrder, pageShifts, chunkSize,
-				directMemoryCacheAlignment);
-		}
-
-		private static byte[] newByteArray(int size) {
-			return PlatformDependent.allocateUninitializedArray(size);
-		}
-
-		@Override
-		boolean isDirect() {
-			return false;
-		}
-
-		@Override
-		protected PoolChunk<byte[]> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {
-			return new PoolChunk<byte[]>(this, newByteArray(chunkSize), pageSize, maxOrder, pageShifts, chunkSize, 0);
-		}
-
-		@Override
-		protected PoolChunk<byte[]> newUnpooledChunk(int capacity) {
-			return new PoolChunk<byte[]>(this, newByteArray(capacity), capacity, 0);
-		}
-
-		@Override
-		protected void destroyChunk(PoolChunk<byte[]> chunk) {
-			// Rely on GC.
-		}
-
-		@Override
-		protected PooledByteBuf<byte[]> newByteBuf(int maxCapacity) {
-			return HAS_UNSAFE ? PooledUnsafeHeapByteBuf.newUnsafeInstance(maxCapacity)
-				: PooledHeapByteBuf.newInstance(maxCapacity);
-		}
-
-		@Override
-		protected void memoryCopy(byte[] src, int srcOffset, byte[] dst, int dstOffset, int length) {
-			if (length == 0) {
-				return;
-			}
-
-			System.arraycopy(src, srcOffset, dst, dstOffset, length);
-		}
-	}
-
-	static final class DirectArena extends PoolArena<ByteBuffer> {
-
-		DirectArena(PooledByteBufAllocator parent, int pageSize, int maxOrder,
-					int pageShifts, int chunkSize, int directMemoryCacheAlignment) {
-			super(parent, pageSize, maxOrder, pageShifts, chunkSize,
-				directMemoryCacheAlignment);
-		}
-
-		@Override
-		boolean isDirect() {
-			return true;
-		}
-
-		// mark as package-private, only for unit test
-		int offsetCacheLine(ByteBuffer memory) {
-			// We can only calculate the offset if Unsafe is present as otherwise directBufferAddress(...) will
-			// throw an NPE.
-			int remainder = HAS_UNSAFE
-				? (int) (PlatformDependent.directBufferAddress(memory) & directMemoryCacheAlignmentMask)
-				: 0;
-
-			// offset = alignment - address & (alignment - 1)
-			return directMemoryCacheAlignment - remainder;
-		}
-
-		@Override
-		protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder,
-												 int pageShifts, int chunkSize) {
-			if (directMemoryCacheAlignment == 0) {
-				return new PoolChunk<ByteBuffer>(this,
-					allocateDirect(chunkSize), pageSize, maxOrder,
-					pageShifts, chunkSize, 0);
-			}
-			final ByteBuffer memory = allocateDirect(chunkSize
-				+ directMemoryCacheAlignment);
-			return new PoolChunk<ByteBuffer>(this, memory, pageSize,
-				maxOrder, pageShifts, chunkSize,
-				offsetCacheLine(memory));
-		}
-
-		@Override
-		protected PoolChunk<ByteBuffer> newUnpooledChunk(int capacity) {
-			if (directMemoryCacheAlignment == 0) {
-				return new PoolChunk<ByteBuffer>(this,
-					allocateDirect(capacity), capacity, 0);
-			}
-			final ByteBuffer memory = allocateDirect(capacity
-				+ directMemoryCacheAlignment);
-			return new PoolChunk<ByteBuffer>(this, memory, capacity,
-				offsetCacheLine(memory));
-		}
-
-		private static ByteBuffer allocateDirect(int capacity) {
-			return PlatformDependent.useDirectBufferNoCleaner() ?
-				PlatformDependent.allocateDirectNoCleaner(capacity) : ByteBuffer.allocateDirect(capacity);
-		}
-
-		@Override
-		protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {
-			if (PlatformDependent.useDirectBufferNoCleaner()) {
-				PlatformDependent.freeDirectNoCleaner(chunk.memory);
-			} else {
-				PlatformDependent.freeDirectBuffer(chunk.memory);
-			}
-		}
-
-		@Override
-		protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
-			if (HAS_UNSAFE) {
-				return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
-			} else {
-				return PooledDirectByteBuf.newInstance(maxCapacity);
-			}
-		}
-
-		@Override
-		protected void memoryCopy(ByteBuffer src, int srcOffset, ByteBuffer dst, int dstOffset, int length) {
-			if (length == 0) {
-				return;
-			}
-
-			if (HAS_UNSAFE) {
-				PlatformDependent.copyMemory(
-					PlatformDependent.directBufferAddress(src) + srcOffset,
-					PlatformDependent.directBufferAddress(dst) + dstOffset, length);
-			} else {
-				// We must duplicate the NIO buffers because they may be accessed by other Netty buffers.
-				src = src.duplicate();
-				dst = dst.duplicate();
-				src.position(srcOffset).limit(srcOffset + length);
-				dst.position(dstOffset);
-				dst.put(src);
-			}
-		}
-	}
-}
diff --git a/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PoolThreadCache.java b/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PoolThreadCache.java
deleted file mode 100644
index 37244e5..0000000
--- a/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PoolThreadCache.java
+++ /dev/null
@@ -1,508 +0,0 @@
-/*
- * Copyright 2012 The Netty Project
- *
- * The Netty Project licenses this file to you under the Apache License,
- * version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at:
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.beam.vendor.grpc.v1p21p0.io.netty.buffer;
-
-
-import static org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
-
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.buffer.PoolArena.SizeClass;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.Recycler;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.Recycler.Handle;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.MathUtil;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.PlatformDependent;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.logging.InternalLogger;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.logging.InternalLoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-// This class is copied from Netty's io.netty.buffer.PoolThreadCache,
-// can be removed after Beam bumps its shaded netty version to 1.22+ (BEAM-9030).
-//
-// Changed lines: 235, 242, 246~251, 268, 275, 280, 284, 426~427, 430, 435, 453, 458, 463~467, 469
-
-/**
- * Acts a Thread cache for allocations. This implementation is moduled after
- * <a href="http://people.freebsd.org/~jasone/jemalloc/bsdcan2006/jemalloc.pdf">jemalloc</a> and the descripted
- * technics of
- * <a href="https://www.facebook.com/notes/facebook-engineering/scalable-memory-allocation-using-jemalloc/480222803919">
- * Scalable memory allocation using jemalloc</a>.
- */
-final class PoolThreadCache {
-
-	private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
-
-	final PoolArena<byte[]> heapArena;
-	final PoolArena<ByteBuffer> directArena;
-
-	// Hold the caches for the different size classes, which are tiny, small and normal.
-	private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
-	private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
-	private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
-	private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
-	private final MemoryRegionCache<byte[]>[] normalHeapCaches;
-	private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
-
-	// Used for bitshifting when calculate the index of normal caches later
-	private final int numShiftsNormalDirect;
-	private final int numShiftsNormalHeap;
-	private final int freeSweepAllocationThreshold;
-	private final AtomicBoolean freed = new AtomicBoolean();
-
-	private int allocations;
-
-	// TODO: Test if adding padding helps under contention
-	//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
-
-	PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
-					int tinyCacheSize, int smallCacheSize, int normalCacheSize,
-					int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
-		checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
-		this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
-		this.heapArena = heapArena;
-		this.directArena = directArena;
-		if (directArena != null) {
-			tinySubPageDirectCaches = createSubPageCaches(
-				tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
-			smallSubPageDirectCaches = createSubPageCaches(
-				smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
-
-			numShiftsNormalDirect = log2(directArena.pageSize);
-			normalDirectCaches = createNormalCaches(
-				normalCacheSize, maxCachedBufferCapacity, directArena);
-
-			directArena.numThreadCaches.getAndIncrement();
-		} else {
-			// No directArea is configured so just null out all caches
-			tinySubPageDirectCaches = null;
-			smallSubPageDirectCaches = null;
-			normalDirectCaches = null;
-			numShiftsNormalDirect = -1;
-		}
-		if (heapArena != null) {
-			// Create the caches for the heap allocations
-			tinySubPageHeapCaches = createSubPageCaches(
-				tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
-			smallSubPageHeapCaches = createSubPageCaches(
-				smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
-
-			numShiftsNormalHeap = log2(heapArena.pageSize);
-			normalHeapCaches = createNormalCaches(
-				normalCacheSize, maxCachedBufferCapacity, heapArena);
-
-			heapArena.numThreadCaches.getAndIncrement();
-		} else {
-			// No heapArea is configured so just null out all caches
-			tinySubPageHeapCaches = null;
-			smallSubPageHeapCaches = null;
-			normalHeapCaches = null;
-			numShiftsNormalHeap = -1;
-		}
-
-		// Only check if there are caches in use.
-		if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
-			|| tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
-			&& freeSweepAllocationThreshold < 1) {
-			throw new IllegalArgumentException("freeSweepAllocationThreshold: "
-				+ freeSweepAllocationThreshold + " (expected: > 0)");
-		}
-	}
-
-	private static <T> MemoryRegionCache<T>[] createSubPageCaches(
-		int cacheSize, int numCaches, SizeClass sizeClass) {
-		if (cacheSize > 0 && numCaches > 0) {
-			@SuppressWarnings("unchecked")
-			MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
-			for (int i = 0; i < cache.length; i++) {
-				// TODO: maybe use cacheSize / cache.length
-				cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
-			}
-			return cache;
-		} else {
-			return null;
-		}
-	}
-
-	private static <T> MemoryRegionCache<T>[] createNormalCaches(
-		int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
-		if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
-			int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
-			int arraySize = Math.max(1, log2(max / area.pageSize) + 1);
-
-			@SuppressWarnings("unchecked")
-			MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
-			for (int i = 0; i < cache.length; i++) {
-				cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
-			}
-			return cache;
-		} else {
-			return null;
-		}
-	}
-
-	private static int log2(int val) {
-		int res = 0;
-		while (val > 1) {
-			val >>= 1;
-			res++;
-		}
-		return res;
-	}
-
-	/**
-	 * Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
-	 */
-	boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
-		return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
-	}
-
-	/**
-	 * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
-	 */
-	boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
-		return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity);
-	}
-
-	/**
-	 * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
-	 */
-	boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
-		return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
-	}
-
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
-		if (cache == null) {
-			// no cache found so just return false here
-			return false;
-		}
-		boolean allocated = cache.allocate(buf, reqCapacity);
-		if (++ allocations >= freeSweepAllocationThreshold) {
-			allocations = 0;
-			trim();
-		}
-		return allocated;
-	}
-
-	/**
-	 * Add {@link PoolChunk} and {@code handle} to the cache if there is enough room.
-	 * Returns {@code true} if it fit into the cache {@code false} otherwise.
-	 */
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
-				long handle, int normCapacity, SizeClass sizeClass) {
-		MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
-		if (cache == null) {
-			return false;
-		}
-		return cache.add(chunk, nioBuffer, handle);
-	}
-
-	private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
-		switch (sizeClass) {
-			case Normal:
-				return cacheForNormal(area, normCapacity);
-			case Small:
-				return cacheForSmall(area, normCapacity);
-			case Tiny:
-				return cacheForTiny(area, normCapacity);
-			default:
-				throw new Error();
-		}
-	}
-
-	/// TODO: In the future when we move to Java9+ we should use java.lang.ref.Cleaner.
-	@Override
-	protected void finalize() throws Throwable {
-		try {
-			super.finalize();
-		} finally {
-			free(true);
-		}
-	}
-
-	/**
-	 *  Should be called if the Thread that uses this cache is about to exist to release resources out of the cache
-	 */
-	void free(boolean finalizer) {
-		// As free() may be called either by the finalizer or by FastThreadLocal.onRemoval(...) we need to ensure
-		// we only call this one time.
-		if (freed.compareAndSet(false, true)) {
-			int numFreed = free(tinySubPageDirectCaches, finalizer) +
-				free(smallSubPageDirectCaches, finalizer) +
-				free(normalDirectCaches, finalizer) +
-				free(tinySubPageHeapCaches, finalizer) +
-				free(smallSubPageHeapCaches, finalizer) +
-				free(normalHeapCaches, finalizer);
-
-			if (numFreed > 0 && logger.isDebugEnabled()) {
-				logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed,
-					Thread.currentThread().getName());
-			}
-
-			if (directArena != null) {
-				directArena.numThreadCaches.getAndDecrement();
-			}
-
-			if (heapArena != null) {
-				heapArena.numThreadCaches.getAndDecrement();
-			}
-		}
-	}
-
-	private static int free(MemoryRegionCache<?>[] caches, boolean finalizer) {
-		if (caches == null) {
-			return 0;
-		}
-
-		int numFreed = 0;
-		for (MemoryRegionCache<?> c: caches) {
-			numFreed += free(c, finalizer);
-		}
-		return numFreed;
-	}
-
-	private static int free(MemoryRegionCache<?> cache, boolean finalizer) {
-		if (cache == null) {
-			return 0;
-		}
-		return cache.free(finalizer);
-	}
-
-	void trim() {
-		trim(tinySubPageDirectCaches);
-		trim(smallSubPageDirectCaches);
-		trim(normalDirectCaches);
-		trim(tinySubPageHeapCaches);
-		trim(smallSubPageHeapCaches);
-		trim(normalHeapCaches);
-	}
-
-	private static void trim(MemoryRegionCache<?>[] caches) {
-		if (caches == null) {
-			return;
-		}
-		for (MemoryRegionCache<?> c: caches) {
-			trim(c);
-		}
-	}
-
-	private static void trim(MemoryRegionCache<?> cache) {
-		if (cache == null) {
-			return;
-		}
-		cache.trim();
-	}
-
-	private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
-		int idx = PoolArena.tinyIdx(normCapacity);
-		if (area.isDirect()) {
-			return cache(tinySubPageDirectCaches, idx);
-		}
-		return cache(tinySubPageHeapCaches, idx);
-	}
-
-	private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) {
-		int idx = PoolArena.smallIdx(normCapacity);
-		if (area.isDirect()) {
-			return cache(smallSubPageDirectCaches, idx);
-		}
-		return cache(smallSubPageHeapCaches, idx);
-	}
-
-	private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
-		if (area.isDirect()) {
-			int idx = log2(normCapacity >> numShiftsNormalDirect);
-			return cache(normalDirectCaches, idx);
-		}
-		int idx = log2(normCapacity >> numShiftsNormalHeap);
-		return cache(normalHeapCaches, idx);
-	}
-
-	private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
-		if (cache == null || idx > cache.length - 1) {
-			return null;
-		}
-		return cache[idx];
-	}
-
-	/**
-	 * Cache used for buffers which are backed by TINY or SMALL size.
-	 */
-	private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
-		SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
-			super(size, sizeClass);
-		}
-
-		@Override
-		protected void initBuf(
-			PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity) {
-			chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity);
-		}
-	}
-
-	/**
-	 * Cache used for buffers which are backed by NORMAL size.
-	 */
-	private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> {
-		NormalMemoryRegionCache(int size) {
-			super(size, SizeClass.Normal);
-		}
-
-		@Override
-		protected void initBuf(
-			PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity) {
-			chunk.initBuf(buf, nioBuffer, handle, reqCapacity);
-		}
-	}
-
-	private abstract static class MemoryRegionCache<T> {
-		private final int size;
-		private final Queue<Entry<T>> queue;
-		private final SizeClass sizeClass;
-		private int allocations;
-
-		MemoryRegionCache(int size, SizeClass sizeClass) {
-			this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
-			queue = PlatformDependent.newFixedMpscQueue(this.size);
-			this.sizeClass = sizeClass;
-		}
-
-		/**
-		 * Init the {@link PooledByteBuf} using the provided chunk and handle with the capacity restrictions.
-		 */
-		protected abstract void initBuf(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle,
-										PooledByteBuf<T> buf, int reqCapacity);
-
-		/**
-		 * Add to cache if not already full.
-		 */
-		@SuppressWarnings("unchecked")
-		public final boolean add(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle) {
-			Entry<T> entry = newEntry(chunk, nioBuffer, handle);
-			boolean queued = queue.offer(entry);
-			if (!queued) {
-				// If it was not possible to cache the chunk, immediately recycle the entry
-				entry.recycle();
-			}
-
-			return queued;
-		}
-
-		/**
-		 * Allocate something out of the cache if possible and remove the entry from the cache.
-		 */
-		public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
-			Entry<T> entry = queue.poll();
-			if (entry == null) {
-				return false;
-			}
-			initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity);
-			entry.recycle();
-
-			// allocations is not thread-safe which is fine as this is only called from the same thread all time.
-			++ allocations;
-			return true;
-		}
-
-		/**
-		 * Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s.
-		 */
-		public final int free(boolean finalizer) {
-			return free(Integer.MAX_VALUE, finalizer);
-		}
-
-		private int free(int max, boolean finalizer) {
-			int numFreed = 0;
-			for (; numFreed < max; numFreed++) {
-				Entry<T> entry = queue.poll();
-				if (entry != null) {
-					freeEntry(entry, finalizer);
-				} else {
-					// all cleared
-					return numFreed;
-				}
-			}
-			return numFreed;
-		}
-
-		/**
-		 * Free up cached {@link PoolChunk}s if not allocated frequently enough.
-		 */
-		public final void trim() {
-			int free = size - allocations;
-			allocations = 0;
-
-			// We not even allocated all the number that are
-			if (free > 0) {
-				free(free, false);
-			}
-		}
-
-		@SuppressWarnings({ "unchecked", "rawtypes" })
-		private  void freeEntry(Entry entry, boolean finalizer) {
-			PoolChunk chunk = entry.chunk;
-			long handle = entry.handle;
-			ByteBuffer nioBuffer = entry.nioBuffer;
-
-			if (!finalizer) {
-				// recycle now so PoolChunk can be GC'ed. This will only be done if this is not freed because of
-				// a finalizer.
-				entry.recycle();
-			}
-
-			chunk.arena.freeChunk(chunk, handle, sizeClass, nioBuffer, finalizer);
-		}
-
-		static final class Entry<T> {
-			final Handle<Entry<?>> recyclerHandle;
-			PoolChunk<T> chunk;
-			ByteBuffer nioBuffer;
-			long handle = -1;
-
-			Entry(Handle<Entry<?>> recyclerHandle) {
-				this.recyclerHandle = recyclerHandle;
-			}
-
-			void recycle() {
-				chunk = null;
-				nioBuffer = null;
-				handle = -1;
-				recyclerHandle.recycle(this);
-			}
-		}
-
-		@SuppressWarnings("rawtypes")
-		private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle) {
-			Entry entry = RECYCLER.get();
-			entry.chunk = chunk;
-			entry.nioBuffer = nioBuffer;
-			entry.handle = handle;
-			return entry;
-		}
-
-		@SuppressWarnings("rawtypes")
-		private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
-			@SuppressWarnings("unchecked")
-			@Override
-			protected Entry newObject(Handle<Entry> handle) {
-				return new Entry(handle);
-			}
-		};
-	}
-}
diff --git a/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PooledByteBufAllocator.java b/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PooledByteBufAllocator.java
deleted file mode 100644
index 2124bdc..0000000
--- a/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PooledByteBufAllocator.java
+++ /dev/null
@@ -1,640 +0,0 @@
-/*
- * Copyright 2012 The Netty Project
- *
- * The Netty Project licenses this file to you under the Apache License,
- * version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at:
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.beam.vendor.grpc.v1p21p0.io.netty.buffer;
-
-import static org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
-
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.NettyRuntime;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.concurrent.FastThreadLocal;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.concurrent.FastThreadLocalThread;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.PlatformDependent;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.StringUtil;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.SystemPropertyUtil;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.logging.InternalLogger;
-import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.logging.InternalLoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-// This class is copied from Netty's io.netty.buffer.PooledByteBufAllocator,
-// can be removed after Beam bumps its shaded netty version to 1.22+ (BEAM-9030).
-//
-// Changed lines: 458
-
-public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
-
-	private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledByteBufAllocator.class);
-	private static final int DEFAULT_NUM_HEAP_ARENA;
-	private static final int DEFAULT_NUM_DIRECT_ARENA;
-
-	private static final int DEFAULT_PAGE_SIZE;
-	private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB per chunk
-	private static final int DEFAULT_TINY_CACHE_SIZE;
-	private static final int DEFAULT_SMALL_CACHE_SIZE;
-	private static final int DEFAULT_NORMAL_CACHE_SIZE;
-	private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
-	private static final int DEFAULT_CACHE_TRIM_INTERVAL;
-	private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS;
-	private static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT;
-	static final int DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK;
-
-	private static final int MIN_PAGE_SIZE = 4096;
-	private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);
-
-	static {
-		int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);
-		Throwable pageSizeFallbackCause = null;
-		try {
-			validateAndCalculatePageShifts(defaultPageSize);
-		} catch (Throwable t) {
-			pageSizeFallbackCause = t;
-			defaultPageSize = 8192;
-		}
-		DEFAULT_PAGE_SIZE = defaultPageSize;
-
-		int defaultMaxOrder = SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 11);
-		Throwable maxOrderFallbackCause = null;
-		try {
-			validateAndCalculateChunkSize(DEFAULT_PAGE_SIZE, defaultMaxOrder);
-		} catch (Throwable t) {
-			maxOrderFallbackCause = t;
-			defaultMaxOrder = 11;
-		}
-		DEFAULT_MAX_ORDER = defaultMaxOrder;
-
-		// Determine reasonable default for nHeapArena and nDirectArena.
-		// Assuming each arena has 3 chunks, the pool should not consume more than 50% of max memory.
-		final Runtime runtime = Runtime.getRuntime();
-
-		/*
-		 * We use 2 * available processors by default to reduce contention as we use 2 * available processors for the
-		 * number of EventLoops in NIO and EPOLL as well. If we choose a smaller number we will run into hot spots as
-		 * allocation and de-allocation needs to be synchronized on the PoolArena.
-		 *
-		 * See https://github.com/netty/netty/issues/3888.
-		 */
-		final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2;
-		final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
-		DEFAULT_NUM_HEAP_ARENA = Math.max(0,
-			SystemPropertyUtil.getInt(
-				"io.netty.allocator.numHeapArenas",
-				(int) Math.min(
-					defaultMinNumArena,
-					runtime.maxMemory() / defaultChunkSize / 2 / 3)));
-		DEFAULT_NUM_DIRECT_ARENA = Math.max(0,
-			SystemPropertyUtil.getInt(
-				"io.netty.allocator.numDirectArenas",
-				(int) Math.min(
-					defaultMinNumArena,
-					PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));
-
-		// cache sizes
-		DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
-		DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
-		DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);
-
-		// 32 kb is the default maximum capacity of the cached buffer. Similar to what is explained in
-		// 'Scalable memory allocation using jemalloc'
-		DEFAULT_MAX_CACHED_BUFFER_CAPACITY = SystemPropertyUtil.getInt(
-			"io.netty.allocator.maxCachedBufferCapacity", 32 * 1024);
-
-		// the number of threshold of allocations when cached entries will be freed up if not frequently used
-		DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt(
-			"io.netty.allocator.cacheTrimInterval", 8192);
-
-		DEFAULT_USE_CACHE_FOR_ALL_THREADS = SystemPropertyUtil.getBoolean(
-			"io.netty.allocator.useCacheForAllThreads", true);
-
-		DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT = SystemPropertyUtil.getInt(
-			"io.netty.allocator.directMemoryCacheAlignment", 0);
-
-		// Use 1023 by default as we use an ArrayDeque as backing storage which will then allocate an internal array
-		// of 1024 elements. Otherwise we would allocate 2048 and only use 1024 which is wasteful.
-		DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK = SystemPropertyUtil.getInt(
-			"io.netty.allocator.maxCachedByteBuffersPerChunk", 1023);
-
-		if (logger.isDebugEnabled()) {
-			logger.debug("-Dio.netty.allocator.numHeapArenas: {}", DEFAULT_NUM_HEAP_ARENA);
-			logger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA);
-			if (pageSizeFallbackCause == null) {
-				logger.debug("-Dio.netty.allocator.pageSize: {}", DEFAULT_PAGE_SIZE);
-			} else {
-				logger.debug("-Dio.netty.allocator.pageSize: {}", DEFAULT_PAGE_SIZE, pageSizeFallbackCause);
-			}
-			if (maxOrderFallbackCause == null) {
-				logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER);
-			} else {
-				logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER, maxOrderFallbackCause);
-			}
-			logger.debug("-Dio.netty.allocator.chunkSize: {}", DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER);
-			logger.debug("-Dio.netty.allocator.tinyCacheSize: {}", DEFAULT_TINY_CACHE_SIZE);
-			logger.debug("-Dio.netty.allocator.smallCacheSize: {}", DEFAULT_SMALL_CACHE_SIZE);
-			logger.debug("-Dio.netty.allocator.normalCacheSize: {}", DEFAULT_NORMAL_CACHE_SIZE);
-			logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY);
-			logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}", DEFAULT_CACHE_TRIM_INTERVAL);
-			logger.debug("-Dio.netty.allocator.useCacheForAllThreads: {}", DEFAULT_USE_CACHE_FOR_ALL_THREADS);
-			logger.debug("-Dio.netty.allocator.maxCachedByteBuffersPerChunk: {}",
-				DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK);
-		}
-	}
-
-	public static final PooledByteBufAllocator DEFAULT =
-		new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
-
-	private final PoolArena<byte[]>[] heapArenas;
-	private final PoolArena<ByteBuffer>[] directArenas;
-	private final int tinyCacheSize;
-	private final int smallCacheSize;
-	private final int normalCacheSize;
-	private final List<PoolArenaMetric> heapArenaMetrics;
-	private final List<PoolArenaMetric> directArenaMetrics;
-	private final PoolThreadLocalCache threadCache;
-	private final int chunkSize;
-	private final PooledByteBufAllocatorMetric metric;
-
-	public PooledByteBufAllocator() {
-		this(false);
-	}
-
-	@SuppressWarnings("deprecation")
-	public PooledByteBufAllocator(boolean preferDirect) {
-		this(preferDirect, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER);
-	}
-
-	@SuppressWarnings("deprecation")
-	public PooledByteBufAllocator(int nHeapArena, int nDirectArena, int pageSize, int maxOrder) {
-		this(false, nHeapArena, nDirectArena, pageSize, maxOrder);
-	}
-
-	/**
-	 * @deprecated use
-	 * {@link PooledByteBufAllocator#PooledByteBufAllocator(boolean, int, int, int, int, int, int, int, boolean)}
-	 */
-	@Deprecated
-	public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) {
-		this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder,
-			DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE);
-	}
-
-	/**
-	 * @deprecated use
-	 * {@link PooledByteBufAllocator#PooledByteBufAllocator(boolean, int, int, int, int, int, int, int, boolean)}
-	 */
-	@Deprecated
-	public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
-								  int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
-		this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, tinyCacheSize, smallCacheSize,
-			normalCacheSize, DEFAULT_USE_CACHE_FOR_ALL_THREADS, DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT);
-	}
-
-	public PooledByteBufAllocator(boolean preferDirect, int nHeapArena,
-								  int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize,
-								  int smallCacheSize, int normalCacheSize,
-								  boolean useCacheForAllThreads) {
-		this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder,
-			tinyCacheSize, smallCacheSize, normalCacheSize,
-			useCacheForAllThreads, DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT);
-	}
-
-	public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
-								  int tinyCacheSize, int smallCacheSize, int normalCacheSize,
-								  boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
-		super(preferDirect);
-		threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
-		this.tinyCacheSize = tinyCacheSize;
-		this.smallCacheSize = smallCacheSize;
-		this.normalCacheSize = normalCacheSize;
-		chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
-
-		checkPositiveOrZero(nHeapArena, "nHeapArena");
-		checkPositiveOrZero(nDirectArena, "nDirectArena");
-
-		checkPositiveOrZero(directMemoryCacheAlignment, "directMemoryCacheAlignment");
-		if (directMemoryCacheAlignment > 0 && !isDirectMemoryCacheAlignmentSupported()) {
-			throw new IllegalArgumentException("directMemoryCacheAlignment is not supported");
-		}
-
-		if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) {
-			throw new IllegalArgumentException("directMemoryCacheAlignment: "
-				+ directMemoryCacheAlignment + " (expected: power of two)");
-		}
-
-		int pageShifts = validateAndCalculatePageShifts(pageSize);
-
-		if (nHeapArena > 0) {
-			heapArenas = newArenaArray(nHeapArena);
-			List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
-			for (int i = 0; i < heapArenas.length; i ++) {
-				PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
-					pageSize, maxOrder, pageShifts, chunkSize,
-					directMemoryCacheAlignment);
-				heapArenas[i] = arena;
-				metrics.add(arena);
-			}
-			heapArenaMetrics = Collections.unmodifiableList(metrics);
-		} else {
-			heapArenas = null;
-			heapArenaMetrics = Collections.emptyList();
-		}
-
-		if (nDirectArena > 0) {
-			directArenas = newArenaArray(nDirectArena);
-			List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
-			for (int i = 0; i < directArenas.length; i ++) {
-				PoolArena.DirectArena arena = new PoolArena.DirectArena(
-					this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
-				directArenas[i] = arena;
-				metrics.add(arena);
-			}
-			directArenaMetrics = Collections.unmodifiableList(metrics);
-		} else {
-			directArenas = null;
-			directArenaMetrics = Collections.emptyList();
-		}
-		metric = new PooledByteBufAllocatorMetric(this);
-	}
-
-	@SuppressWarnings("unchecked")
-	private static <T> PoolArena<T>[] newArenaArray(int size) {
-		return new PoolArena[size];
-	}
-
-	private static int validateAndCalculatePageShifts(int pageSize) {
-		if (pageSize < MIN_PAGE_SIZE) {
-			throw new IllegalArgumentException("pageSize: " + pageSize + " (expected: " + MIN_PAGE_SIZE + ")");
-		}
-
-		if ((pageSize & pageSize - 1) != 0) {
-			throw new IllegalArgumentException("pageSize: " + pageSize + " (expected: power of 2)");
-		}
-
-		// Logarithm base 2. At this point we know that pageSize is a power of two.
-		return Integer.SIZE - 1 - Integer.numberOfLeadingZeros(pageSize);
-	}
-
-	private static int validateAndCalculateChunkSize(int pageSize, int maxOrder) {
-		if (maxOrder > 14) {
-			throw new IllegalArgumentException("maxOrder: " + maxOrder + " (expected: 0-14)");
-		}
-
-		// Ensure the resulting chunkSize does not overflow.
-		int chunkSize = pageSize;
-		for (int i = maxOrder; i > 0; i --) {
-			if (chunkSize > MAX_CHUNK_SIZE / 2) {
-				throw new IllegalArgumentException(String.format(
-					"pageSize (%d) << maxOrder (%d) must not exceed %d", pageSize, maxOrder, MAX_CHUNK_SIZE));
-			}
-			chunkSize <<= 1;
-		}
-		return chunkSize;
-	}
-
-	@Override
-	protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
-		PoolThreadCache cache = threadCache.get();
-		PoolArena<byte[]> heapArena = cache.heapArena;
-
-		final ByteBuf buf;
-		if (heapArena != null) {
-			buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
-		} else {
-			buf = PlatformDependent.hasUnsafe() ?
-				new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
-				new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
-		}
-
-		return toLeakAwareBuffer(buf);
-	}
-
-	@Override
-	protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
-		PoolThreadCache cache = threadCache.get();
-		PoolArena<ByteBuffer> directArena = cache.directArena;
-
-		final ByteBuf buf;
-		if (directArena != null) {
-			buf = directArena.allocate(cache, initialCapacity, maxCapacity);
-		} else {
-			buf = PlatformDependent.hasUnsafe() ?
-				UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
-				new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
-		}
-
-		return toLeakAwareBuffer(buf);
-	}
-
-	/**
-	 * Default number of heap arenas - System Property: io.netty.allocator.numHeapArenas - default 2 * cores
-	 */
-	public static int defaultNumHeapArena() {
-		return DEFAULT_NUM_HEAP_ARENA;
-	}
-
-	/**
-	 * Default number of direct arenas - System Property: io.netty.allocator.numDirectArenas - default 2 * cores
-	 */
-	public static int defaultNumDirectArena() {
-		return DEFAULT_NUM_DIRECT_ARENA;
-	}
-
-	/**
-	 * Default buffer page size - System Property: io.netty.allocator.pageSize - default 8192
-	 */
-	public static int defaultPageSize() {
-		return DEFAULT_PAGE_SIZE;
-	}
-
-	/**
-	 * Default maximum order - System Property: io.netty.allocator.maxOrder - default 11
-	 */
-	public static int defaultMaxOrder() {
-		return DEFAULT_MAX_ORDER;
-	}
-
-	/**
-	 * Default thread caching behavior - System Property: io.netty.allocator.useCacheForAllThreads - default true
-	 */
-	public static boolean defaultUseCacheForAllThreads() {
-		return DEFAULT_USE_CACHE_FOR_ALL_THREADS;
-	}
-
-	/**
-	 * Default prefer direct - System Property: io.netty.noPreferDirect - default false
-	 */
-	public static boolean defaultPreferDirect() {
-		return PlatformDependent.directBufferPreferred();
-	}
-
-	/**
-	 * Default tiny cache size - System Property: io.netty.allocator.tinyCacheSize - default 512
-	 */
-	public static int defaultTinyCacheSize() {
-		return DEFAULT_TINY_CACHE_SIZE;
-	}
-
-	/**
-	 * Default small cache size - System Property: io.netty.allocator.smallCacheSize - default 256
-	 */
-	public static int defaultSmallCacheSize() {
-		return DEFAULT_SMALL_CACHE_SIZE;
-	}
-
-	/**
-	 * Default normal cache size - System Property: io.netty.allocator.normalCacheSize - default 64
-	 */
-	public static int defaultNormalCacheSize() {
-		return DEFAULT_NORMAL_CACHE_SIZE;
-	}
-
-	/**
-	 * Return {@code true} if direct memory cache alignment is supported, {@code false} otherwise.
-	 */
-	public static boolean isDirectMemoryCacheAlignmentSupported() {
-		return PlatformDependent.hasUnsafe();
-	}
-
-	@Override
-	public boolean isDirectBufferPooled() {
-		return directArenas != null;
-	}
-
-	/**
-	 * Returns {@code true} if the calling {@link Thread} has a {@link ThreadLocal} cache for the allocated
-	 * buffers.
-	 */
-	@Deprecated
-	public boolean hasThreadLocalCache() {
-		return threadCache.isSet();
-	}
-
-	/**
-	 * Free all cached buffers for the calling {@link Thread}.
-	 */
-	@Deprecated
-	public void freeThreadLocalCache() {
-		threadCache.remove();
-	}
-
-	final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
-		private final boolean useCacheForAllThreads;
-
-		PoolThreadLocalCache(boolean useCacheForAllThreads) {
-			this.useCacheForAllThreads = useCacheForAllThreads;
-		}
-
-		@Override
-		protected synchronized PoolThreadCache initialValue() {
-			final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
-			final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
-
-			Thread current = Thread.currentThread();
-			if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
-				return new PoolThreadCache(
-					heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
-					DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
-			}
-			// No caching so just use 0 as sizes.
-			return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
-		}
-
-		@Override
-		protected void onRemoval(PoolThreadCache threadCache) {
-			threadCache.free(false);
-		}
-
-		private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
-			if (arenas == null || arenas.length == 0) {
-				return null;
-			}
-
-			PoolArena<T> minArena = arenas[0];
-			for (int i = 1; i < arenas.length; i++) {
-				PoolArena<T> arena = arenas[i];
-				if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
-					minArena = arena;
-				}
-			}
-
-			return minArena;
-		}
-	}
-
-	@Override
-	public PooledByteBufAllocatorMetric metric() {
-		return metric;
-	}
-
-	/**
-	 * Return the number of heap arenas.
-	 *
-	 * @deprecated use {@link PooledByteBufAllocatorMetric#numHeapArenas()}.
-	 */
-	@Deprecated
-	public int numHeapArenas() {
-		return heapArenaMetrics.size();
-	}
-
-	/**
-	 * Return the number of direct arenas.
-	 *
-	 * @deprecated use {@link PooledByteBufAllocatorMetric#numDirectArenas()}.
-	 */
-	@Deprecated
-	public int numDirectArenas() {
-		return directArenaMetrics.size();
-	}
-
-	/**
-	 * Return a {@link List} of all heap {@link PoolArenaMetric}s that are provided by this pool.
-	 *
-	 * @deprecated use {@link PooledByteBufAllocatorMetric#heapArenas()}.
-	 */
-	@Deprecated
-	public List<PoolArenaMetric> heapArenas() {
-		return heapArenaMetrics;
-	}
-
-	/**
-	 * Return a {@link List} of all direct {@link PoolArenaMetric}s that are provided by this pool.
-	 *
-	 * @deprecated use {@link PooledByteBufAllocatorMetric#directArenas()}.
-	 */
-	@Deprecated
-	public List<PoolArenaMetric> directArenas() {
-		return directArenaMetrics;
-	}
-
-	/**
-	 * Return the number of thread local caches used by this {@link PooledByteBufAllocator}.
-	 *
-	 * @deprecated use {@link PooledByteBufAllocatorMetric#numThreadLocalCaches()}.
-	 */
-	@Deprecated
-	public int numThreadLocalCaches() {
-		PoolArena<?>[] arenas = heapArenas != null ? heapArenas : directArenas;
-		if (arenas == null) {
-			return 0;
-		}
-
-		int total = 0;
-		for (PoolArena<?> arena : arenas) {
-			total += arena.numThreadCaches.get();
-		}
-
-		return total;
-	}
-
-	/**
-	 * Return the size of the tiny cache.
-	 *
-	 * @deprecated use {@link PooledByteBufAllocatorMetric#tinyCacheSize()}.
-	 */
-	@Deprecated
-	public int tinyCacheSize() {
-		return tinyCacheSize;
-	}
-
-	/**
-	 * Return the size of the small cache.
-	 *
-	 * @deprecated use {@link PooledByteBufAllocatorMetric#smallCacheSize()}.
-	 */
-	@Deprecated
-	public int smallCacheSize() {
-		return smallCacheSize;
-	}
-
-	/**
-	 * Return the size of the normal cache.
-	 *
-	 * @deprecated use {@link PooledByteBufAllocatorMetric#normalCacheSize()}.
-	 */
-	@Deprecated
-	public int normalCacheSize() {
-		return normalCacheSize;
-	}
-
-	/**
-	 * Return the chunk size for an arena.
-	 *
-	 * @deprecated use {@link PooledByteBufAllocatorMetric#chunkSize()}.
-	 */
-	@Deprecated
-	public final int chunkSize() {
-		return chunkSize;
-	}
-
-	final long usedHeapMemory() {
-		return usedMemory(heapArenas);
-	}
-
-	final long usedDirectMemory() {
-		return usedMemory(directArenas);
-	}
-
-	private static long usedMemory(PoolArena<?>[] arenas) {
-		if (arenas == null) {
-			return -1;
-		}
-		long used = 0;
-		for (PoolArena<?> arena : arenas) {
-			used += arena.numActiveBytes();
-			if (used < 0) {
-				return Long.MAX_VALUE;
-			}
-		}
-		return used;
-	}
-
-	final PoolThreadCache threadCache() {
-		PoolThreadCache cache =  threadCache.get();
-		assert cache != null;
-		return cache;
-	}
-
-	/**
-	 * Returns the status of the allocator (which contains all metrics) as string. Be aware this may be expensive
-	 * and so should not called too frequently.
-	 */
-	public String dumpStats() {
-		int heapArenasLen = heapArenas == null ? 0 : heapArenas.length;
-		StringBuilder buf = new StringBuilder(512)
-			.append(heapArenasLen)
-			.append(" heap arena(s):")
-			.append(StringUtil.NEWLINE);
-		if (heapArenasLen > 0) {
-			for (PoolArena<byte[]> a: heapArenas) {
-				buf.append(a);
-			}
-		}
-
-		int directArenasLen = directArenas == null ? 0 : directArenas.length;
-
-		buf.append(directArenasLen)
-			.append(" direct arena(s):")
-			.append(StringUtil.NEWLINE);
-		if (directArenasLen > 0) {
-			for (PoolArena<ByteBuffer> a: directArenas) {
-				buf.append(a);
-			}
-		}
-
-		return buf.toString();
-	}
-}
diff --git a/flink-python/src/main/java/org/apache/flink/datastream/runtime/runners/python/beam/BeamDataStreamPythonStatelessFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/datastream/runtime/runners/python/beam/BeamDataStreamPythonStatelessFunctionRunner.java
index 761d9d8..488d8fa 100644
--- a/flink-python/src/main/java/org/apache/flink/datastream/runtime/runners/python/beam/BeamDataStreamPythonStatelessFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/datastream/runtime/runners/python/beam/BeamDataStreamPythonStatelessFunctionRunner.java
@@ -82,7 +82,7 @@ public class BeamDataStreamPythonStatelessFunctionRunner extends BeamPythonState
 			.setSpec(
 		RunnerApi.FunctionSpec.newBuilder()
 					.setUrn(this.coderUrn)
-					.setPayload(org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString.copyFrom(
+					.setPayload(org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.copyFrom(
 		PythonTypeUtils.TypeInfoToProtoConverter.toTypeInfoProto(builtFieldType).toByteArray()
 					)).build()
 			).build();
diff --git a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
index 791c9d4..7da5563 100644
--- a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
+++ b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
@@ -136,7 +136,7 @@ public class PythonOptions {
 		.defaultValue("python")
 		.withDescription("Specify the path of the python interpreter used to execute the python " +
 			"UDF worker. The python UDF worker depends on Python 3.5+, Apache Beam " +
-			"(version == 2.19.0), Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). " +
+			"(version == 2.23.0), Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). " +
 			"Please ensure that the specified environment meets the above requirements. The " +
 			"option is equivalent to the command line option \"-pyexec\".");
 
diff --git a/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java b/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java
index dc61cc5..ae36b0a 100644
--- a/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java
+++ b/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java
@@ -106,9 +106,14 @@ public final class FlinkMetricContainer {
 
 	private boolean isUserMetric(MetricResult metricResult) {
 		MetricName metricName = metricResult.getKey().metricName();
-		return (metricName instanceof MonitoringInfoMetricName) &&
-			((MonitoringInfoMetricName) metricName).getUrn()
-				.contains(MonitoringInfoConstants.Urns.USER_COUNTER);
+		if (metricName instanceof MonitoringInfoMetricName) {
+			String urn = ((MonitoringInfoMetricName) metricName).getUrn();
+			return urn.contains(MonitoringInfoConstants.Urns.USER_SUM_INT64) ||
+				urn.contains(MonitoringInfoConstants.Urns.USER_SUM_DOUBLE) ||
+				urn.contains(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_DOUBLE) ||
+				urn.contains(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64);
+		}
+		return false;
 	}
 
 	private void updateCounterOrMeter(Iterable<MetricResult<Long>> counters) {
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
index 8ebf3da..2516147 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
@@ -44,7 +44,8 @@ import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PortablePipelineOptions;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -227,7 +228,7 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner {
 			remoteBundle = stageBundleFactory.getBundle(createOutputReceiverFactory(), stateRequestHandler, progressHandler);
 			mainInputReceiver =
 				Preconditions.checkNotNull(
-					remoteBundle.getInputReceivers().get(MAIN_INPUT_ID),
+					Iterables.getOnlyElement(remoteBundle.getInputReceivers().values()),
 					"Failed to retrieve main input receiver.");
 		} catch (Throwable t) {
 			throw new RuntimeException("Failed to start remote bundle", t);
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonStatelessFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonStatelessFunctionRunner.java
index f97e26b..91acbc8 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonStatelessFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonStatelessFunctionRunner.java
@@ -40,6 +40,8 @@ import org.apache.beam.sdk.util.WindowedValue;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -100,7 +102,7 @@ public abstract class BeamPythonStatelessFunctionRunner extends BeamPythonFuncti
 						.setSpec(RunnerApi.FunctionSpec.newBuilder()
 							.setUrn(functionUrn)
 							.setPayload(
-								org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString.copyFrom(
+								org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.copyFrom(
 									getUserDefinedFunctionsProtoBytes()))
 							.build())
 						.putInputs(MAIN_INPUT_NAME, INPUT_ID)
@@ -137,18 +139,28 @@ public abstract class BeamPythonStatelessFunctionRunner extends BeamPythonFuncti
 			components, createPythonExecutionEnvironment(), input, sideInputs, userStates, timers, transforms, outputs, createValueOnlyWireCoderSetting());
 	}
 
-	private RunnerApi.WireCoderSetting createValueOnlyWireCoderSetting() throws IOException {
+	private Collection<RunnerApi.ExecutableStagePayload.WireCoderSetting> createValueOnlyWireCoderSetting() throws IOException {
 		WindowedValue<byte[]> value = WindowedValue.valueInGlobalWindow(new byte[0]);
 		Coder<? extends BoundedWindow> windowCoder = GlobalWindow.Coder.INSTANCE;
 		WindowedValue.FullWindowedValueCoder<byte[]> windowedValueCoder =
 			WindowedValue.FullWindowedValueCoder.of(ByteArrayCoder.of(), windowCoder);
 		ByteArrayOutputStream baos = new ByteArrayOutputStream();
 		windowedValueCoder.encode(value, baos);
-		return RunnerApi.WireCoderSetting.newBuilder()
-			.setUrn(getUrn(RunnerApi.StandardCoders.Enum.PARAM_WINDOWED_VALUE))
-			.setPayload(
-				org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString.copyFrom(baos.toByteArray()))
-			.build();
+
+		return Arrays.asList(
+			RunnerApi.ExecutableStagePayload.WireCoderSetting.newBuilder()
+				.setUrn(getUrn(RunnerApi.StandardCoders.Enum.PARAM_WINDOWED_VALUE))
+				.setPayload(
+					org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.copyFrom(baos.toByteArray()))
+				.setInputOrOutputId(INPUT_ID)
+				.build(),
+			RunnerApi.ExecutableStagePayload.WireCoderSetting.newBuilder()
+				.setUrn(getUrn(RunnerApi.StandardCoders.Enum.PARAM_WINDOWED_VALUE))
+				.setPayload(
+					org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.copyFrom(baos.toByteArray()))
+				.setInputOrOutputId(OUTPUT_ID)
+				.build()
+		);
 	}
 
 	protected abstract byte[] getUserDefinedFunctionsProtoBytes();
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonStatelessFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonStatelessFunctionRunner.java
index fc97b16..91ff931 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonStatelessFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonStatelessFunctionRunner.java
@@ -65,7 +65,7 @@ public class BeamTablePythonStatelessFunctionRunner extends BeamPythonStatelessF
 			.setSpec(
 				RunnerApi.FunctionSpec.newBuilder()
 					.setUrn(coderUrn)
-					.setPayload(org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString.copyFrom(
+					.setPayload(org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.copyFrom(
 						toProtoType(rowType).getRowSchema().toByteArray()))
 					.build())
 			.build();
diff --git a/flink-python/src/main/resources/META-INF/NOTICE b/flink-python/src/main/resources/META-INF/NOTICE
index c7c6a96..1409c57 100644
--- a/flink-python/src/main/resources/META-INF/NOTICE
+++ b/flink-python/src/main/resources/META-INF/NOTICE
@@ -16,24 +16,24 @@ This project bundles the following dependencies under the Apache Software Licens
 - org.apache.arrow:arrow-format:0.16.0
 - org.apache.arrow:arrow-memory:0.16.0
 - org.apache.arrow:arrow-vector:0.16.0
-- org.apache.beam:beam-model-fn-execution:2.19.0
-- org.apache.beam:beam-model-job-management:2.19.0
-- org.apache.beam:beam-model-pipeline:2.19.0
-- org.apache.beam:beam-runners-core-construction-java:2.19.0
-- org.apache.beam:beam-runners-core-java:2.19.0
-- org.apache.beam:beam-runners-java-fn-execution:2.19.0
-- org.apache.beam:beam-sdks-java-core:2.19.0
-- org.apache.beam:beam-sdks-java-fn-execution:2.19.0
-- org.apache.beam:beam-vendor-bytebuddy-1_9_3:0.1
-- org.apache.beam:beam-vendor-sdks-java-extensions-protobuf:2.19.0
+- org.apache.beam:beam-model-fn-execution:2.23.0
+- org.apache.beam:beam-model-job-management:2.23.0
+- org.apache.beam:beam-model-pipeline:2.23.0
+- org.apache.beam:beam-runners-core-construction-java:2.23.0
+- org.apache.beam:beam-runners-core-java:2.23.0
+- org.apache.beam:beam-runners-java-fn-execution:2.23.0
+- org.apache.beam:beam-sdks-java-core:2.23.0
+- org.apache.beam:beam-sdks-java-fn-execution:2.23.0
+- org.apache.beam:beam-vendor-bytebuddy-1_10_8:0.1
+- org.apache.beam:beam-vendor-sdks-java-extensions-protobuf:2.23.0
 - org.apache.beam:beam-vendor-guava-26_0-jre:0.1
-- org.apache.beam:beam-vendor-grpc-1_21_0:0.1
+- org.apache.beam:beam-vendor-grpc-1_26_0:0.3
 
 This project bundles the following dependencies under the BSD license.
 See bundled license files for details
 
 - net.sf.py4j:py4j:0.10.8.1
-- com.google.protobuf:protobuf-java:3.7.1
+- com.google.protobuf:protobuf-java:3.11.1
 
 This project bundles the following dependencies under the MIT license. (https://opensource.org/licenses/MIT)
 See bundled license files for details.
@@ -43,35 +43,35 @@ See bundled license files for details.
 The bundled Apache Beam dependencies bundle the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
 
 - com.google.api.grpc:proto-google-common-protos:1.12.0
-- com.google.code.gson:gson:2.7
+- com.google.code.gson:gson:2.8.6
 - com.google.guava:guava:26.0-jre
-- io.grpc:grpc-auth:1.21.0
-- io.grpc:grpc-core:1.21.0
-- io.grpc:grpc-context:1.21.0
-- io.grpc:grpc-netty:1.21.0
-- io.grpc:grpc-protobuf:1.21.0
-- io.grpc:grpc-stub:1.21.0
-- io.grpc:grpc-testing:1.21.0
-- io.netty:netty-buffer:4.1.34.Final
-- io.netty:netty-codec:4.1.34.Final
-- io.netty:netty-codec-http:4.1.34.Final
-- io.netty:netty-codec-http2:4.1.34.Final
-- io.netty:netty-codec-socks:4.1.34.Final
-- io.netty:netty-common:4.1.34.Final
-- io.netty:netty-handler:4.1.34.Final
-- io.netty:netty-handler-proxy:4.1.34.Final
-- io.netty:netty-resolver:4.1.34.Final
-- io.netty:netty-transport:4.1.34.Final
-- io.netty:netty-transport-native-epoll:4.1.34.Final
-- io.netty:netty-transport-native-unix-common:4.1.34.Final
-- io.netty:netty-tcnative-boringssl-static:2.0.22.Final
-- io.opencensus:opencensus-api:0.21.0
-- io.opencensus:opencensus-contrib-grpc-metrics:0.21.0
-- net.bytebuddy:1.9.3
+- io.grpc:grpc-auth:1.26.0
+- io.grpc:grpc-core:1.26.0
+- io.grpc:grpc-context:1.26.0
+- io.grpc:grpc-netty:1.26.0
+- io.grpc:grpc-protobuf:1.26.0
+- io.grpc:grpc-stub:1.26.0
+- io.grpc:grpc-testing:1.26.0
+- io.netty:netty-buffer:4.1.42.Final
+- io.netty:netty-codec:4.1.42.Final
+- io.netty:netty-codec-http:4.1.42.Final
+- io.netty:netty-codec-http2:4.1.42.Final
+- io.netty:netty-codec-socks:4.1.42.Final
+- io.netty:netty-common:4.1.42.Final
+- io.netty:netty-handler:4.1.42.Final
+- io.netty:netty-handler-proxy:4.1.42.Final
+- io.netty:netty-resolver:4.1.42.Final
+- io.netty:netty-transport:4.1.42.Final
+- io.netty:netty-transport-native-epoll:4.1.42.Final
+- io.netty:netty-transport-native-unix-common:4.1.42.Final
+- io.netty:netty-tcnative-boringssl-static:2.0.26.Final
+- io.opencensus:opencensus-api:0.24.0
+- io.opencensus:opencensus-contrib-grpc-metrics:0.24.0
+- net.bytebuddy:1.10.8
 
 The bundled Apache Beam dependencies bundle the following dependencies under the BSD license.
 See bundled license files for details
 
-- com.google.auth:google-auth-library-credentials:0.13.0
-- com.google.protobuf:protobuf-java:3.7.1
-- com.google.protobuf:protobuf-java-util:3.7.1
+- com.google.auth:google-auth-library-credentials:0.18.0
+- com.google.protobuf:protobuf-java:3.11.0
+- com.google.protobuf:protobuf-java-util:3.11.0
diff --git a/flink-python/src/test/java/org/apache/flink/python/metric/FlinkMetricContainerTest.java b/flink-python/src/test/java/org/apache/flink/python/metric/FlinkMetricContainerTest.java
index f05f1b6..266dab8 100644
--- a/flink-python/src/test/java/org/apache/flink/python/metric/FlinkMetricContainerTest.java
+++ b/flink-python/src/test/java/org/apache/flink/python/metric/FlinkMetricContainerTest.java
@@ -28,9 +28,9 @@ import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.metrics.groups.GenericMetricGroup;
 import org.apache.flink.runtime.metrics.groups.MetricGroupTest;
 
-import org.apache.beam.model.pipeline.v1.MetricsApi;
 import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
-import org.apache.beam.runners.core.construction.BeamUrns;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.GaugeData;
 import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
 import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
 import org.apache.beam.sdk.metrics.DistributionResult;
@@ -66,9 +66,6 @@ public class FlinkMetricContainerTest {
 
 	private FlinkMetricContainer container;
 
-	private static final String GAUGE_URN =
-		BeamUrns.getUrn(MetricsApi.MonitoringInfoTypeUrns.Enum.LATEST_INT64_TYPE);
-
 	private static final List<String> DEFAULT_SCOPE_COMPONENTS = Arrays.asList(
 		"key",
 		"value",
@@ -121,11 +118,11 @@ public class FlinkMetricContainerTest {
 
 		MonitoringInfo userMonitoringInfo =
 			new SimpleMonitoringInfoBuilder()
-				.setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+				.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
 				.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, DEFAULT_NAMESPACE)
 				.setLabel(MonitoringInfoConstants.Labels.NAME, "myCounter")
 				.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "anyPTransform")
-				.setInt64Value(111)
+				.setInt64SumValue(111)
 				.build();
 
 		assertThat(userCounter.getCount(), is(0L));
@@ -142,11 +139,11 @@ public class FlinkMetricContainerTest {
 
 		MonitoringInfo userMonitoringInfo =
 			new SimpleMonitoringInfoBuilder()
-				.setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+				.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
 				.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, namespace)
 				.setLabel(MonitoringInfoConstants.Labels.NAME, "myMeter")
 				.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "anyPTransform")
-				.setInt64Value(111)
+				.setInt64SumValue(111)
 				.build();
 		assertThat(userMeter.getCount(), is(0L));
 		assertThat(userMeter.getRate(), is(0.0));
@@ -159,17 +156,12 @@ public class FlinkMetricContainerTest {
 
 	@Test
 	public void testGaugeMonitoringInfoUpdate() {
-		MonitoringInfo userMonitoringInfo = MonitoringInfo.newBuilder()
-			.setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
-			.putLabels(MonitoringInfoConstants.Labels.NAMESPACE, DEFAULT_NAMESPACE)
-			.putLabels(MonitoringInfoConstants.Labels.NAME, "myGauge")
-			.putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "anyPTransform")
-			.setMetric(MetricsApi.Metric
-				.newBuilder()
-				.setCounterData(
-					MetricsApi.CounterData.newBuilder()
-						.setInt64Value(111L)))
-			.setType(GAUGE_URN)
+		MonitoringInfo userMonitoringInfo = new SimpleMonitoringInfoBuilder()
+			.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
+			.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, DEFAULT_NAMESPACE)
+			.setLabel(MonitoringInfoConstants.Labels.NAME, "myGauge")
+			.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "anyPTransform")
+			.setInt64LatestValue(GaugeData.create(111L))
 			.build();
 
 		container.updateMetrics("step", ImmutableList.of(userMonitoringInfo));
@@ -185,21 +177,12 @@ public class FlinkMetricContainerTest {
 
 	@Test
 	public void testDistributionMonitoringInfoUpdate() {
-		MonitoringInfo userMonitoringInfo = MonitoringInfo.newBuilder()
-			.setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_COUNTER)
-			.putLabels(MonitoringInfoConstants.Labels.NAMESPACE, DEFAULT_NAMESPACE)
-			.putLabels(MonitoringInfoConstants.Labels.NAME, "myDistribution")
-			.putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "anyPTransform")
-			.setMetric(
-				MetricsApi.Metric.newBuilder()
-					.setDistributionData(
-						MetricsApi.DistributionData.newBuilder()
-							.setIntDistributionData(
-								MetricsApi.IntDistributionData.newBuilder()
-									.setSum(30)
-									.setCount(10)
-									.setMin(1)
-									.setMax(5))))
+		MonitoringInfo userMonitoringInfo = new SimpleMonitoringInfoBuilder()
+			.setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64)
+			.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, DEFAULT_NAMESPACE)
+			.setLabel(MonitoringInfoConstants.Labels.NAME, "myDistribution")
+			.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "anyPTransform")
+			.setInt64DistributionValue(DistributionData.create(30, 10, 1, 5))
 			.build();
 
 		container.updateMetrics("step", ImmutableList.of(userMonitoringInfo));
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
index 7541ad2..57dc439 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
@@ -25,7 +25,7 @@ import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonStatele
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
-import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct;
 
 import java.util.LinkedList;
 import java.util.List;
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
index 5625cfb..2fc4264 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
@@ -25,7 +25,7 @@ import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonStatele
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
-import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct;
 
 import java.util.LinkedList;
 import java.util.List;
diff --git a/flink-python/tox.ini b/flink-python/tox.ini
index 62c2b9b..909854b 100644
--- a/flink-python/tox.ini
+++ b/flink-python/tox.ini
@@ -21,14 +21,14 @@
 # in multiple virtualenvs. This configuration file will run the
 # test suite on all supported python versions.
 # new environments will be excluded by default unless explicitly added to envlist.
-envlist = {py35, py36, py37}-cython
+envlist = {py35, py36, py37, py38}-cython
 
 [testenv]
 whitelist_externals=
     /bin/bash
 deps =
     pytest
-    apache-beam==2.19.0
+    apache-beam==2.23.0
     cython==0.29.16
     grpcio>=1.17.0,<=1.26.0
     grpcio-tools>=1.3.5,<=1.14.2
diff --git a/pom.xml b/pom.xml
index 7a7e586..7afb7a9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -133,8 +133,8 @@ under the License.
 		<powermock.version>2.0.4</powermock.version>
 		<hamcrest.version>1.3</hamcrest.version>
 		<py4j.version>0.10.8.1</py4j.version>
-		<beam.version>2.19.0</beam.version>
-		<protoc.version>3.7.1</protoc.version>
+		<beam.version>2.23.0</beam.version>
+		<protoc.version>3.11.1</protoc.version>
 		<arrow.version>0.16.0</arrow.version>
 		<japicmp.skip>false</japicmp.skip>
 		<flink.convergence.phase>validate</flink.convergence.phase>