You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hx...@apache.org on 2022/08/11 12:39:10 UTC

[flink] branch master updated: [FLINK-28887][python] Fix the bug of custom metrics in Thread Mode

This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ebb787ff35 [FLINK-28887][python] Fix the bug of custom metrics in Thread Mode
4ebb787ff35 is described below

commit 4ebb787ff354e5c54ea4c55d712bab6220d9ed55
Author: huangxingbo <hx...@apache.org>
AuthorDate: Thu Aug 11 10:35:37 2022 +0800

    [FLINK-28887][python] Fix the bug of custom metrics in Thread Mode
    
    This closes #20540.
---
 flink-python/dev/integration_test.sh               |   3 -
 .../pyflink/datastream/tests/test_data_stream.py   |  50 +++++++
 .../datastream/embedded/runtime_context.py         |   6 +-
 .../fn_execution/datastream/process/operations.py  |   2 +-
 .../tests => fn_execution/metrics}/__init__.py     |   0
 .../metrics/embedded}/__init__.py                  |   0
 .../metrics/embedded/counter_impl.py}              |  24 +++
 .../metrics/embedded/distribution_impl.py}         |  12 ++
 .../metrics/embedded/meter_impl.py}                |  19 +++
 .../fn_execution/metrics/embedded/metric_impl.py   |  61 ++++++++
 .../metrics/process}/__init__.py                   |   0
 .../fn_execution/metrics/process/counter_impl.py}  |  48 +++---
 .../metrics/process/distribution_impl.py}          |  14 ++
 .../fn_execution/metrics/process/meter_impl.py}    |  43 +++---
 .../fn_execution/metrics/process/metric_impl.py    | 103 +++++++++++++
 .../{ => fn_execution}/metrics/tests/__init__.py   |   0
 .../metrics/tests/test_metric.py                   |  13 +-
 .../pyflink/fn_execution/table/operations.py       |   2 +-
 flink-python/pyflink/metrics/metricbase.py         | 161 +++++----------------
 flink-python/setup.py                              |   3 +
 .../python/metric/embedded/MetricDistribution.java |  38 +++++
 .../flink/python/metric/embedded/MetricGauge.java  |  40 +++++
 .../metric/{ => process}/FlinkMetricContainer.java |   2 +-
 .../python/AbstractPythonFunctionOperator.java     |   2 +-
 .../beam/BeamDataStreamPythonFunctionRunner.java   |   2 +-
 .../python/beam/BeamPythonFunctionRunner.java      |   2 +-
 .../python/beam/BeamTablePythonFunctionRunner.java |   2 +-
 .../{ => process}/FlinkMetricContainerTest.java    |   2 +-
 .../PassThroughPythonAggregateFunctionRunner.java  |   2 +-
 .../PassThroughPythonScalarFunctionRunner.java     |   2 +-
 .../PassThroughPythonTableFunctionRunner.java      |   2 +-
 ...ThroughStreamAggregatePythonFunctionRunner.java |   2 +-
 ...amGroupWindowAggregatePythonFunctionRunner.java |   2 +-
 ...ghStreamTableAggregatePythonFunctionRunner.java |   2 +-
 .../flink/table/runtime/utils/PythonTestUtils.java |   2 +-
 35 files changed, 467 insertions(+), 201 deletions(-)

diff --git a/flink-python/dev/integration_test.sh b/flink-python/dev/integration_test.sh
index c358632f217..bb868468f4f 100755
--- a/flink-python/dev/integration_test.sh
+++ b/flink-python/dev/integration_test.sh
@@ -42,8 +42,5 @@ test_module "datastream"
 # test fn_execution module
 test_module "fn_execution"
 
-# test metrics module
-test_module "metrics"
-
 # test table module
 test_module "table"
diff --git a/flink-python/pyflink/datastream/tests/test_data_stream.py b/flink-python/pyflink/datastream/tests/test_data_stream.py
index 79165d40490..7380be28b0f 100644
--- a/flink-python/pyflink/datastream/tests/test_data_stream.py
+++ b/flink-python/pyflink/datastream/tests/test_data_stream.py
@@ -42,6 +42,7 @@ from pyflink.datastream.state import (ValueStateDescriptor, ListStateDescriptor,
                                       AggregatingStateDescriptor, StateTtlConfig)
 from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
 from pyflink.java_gateway import get_gateway
+from pyflink.metrics import Counter, Meter, Distribution
 from pyflink.testing.test_case_utils import PyFlinkBatchTestCase, PyFlinkStreamingTestCase
 from pyflink.util.java_utils import get_j_env_configuration
 
@@ -1718,6 +1719,55 @@ class EmbeddedDataStreamStreamTests(DataStreamStreamingTests, PyFlinkStreamingTe
         config = get_j_env_configuration(self.env._j_stream_execution_environment)
         config.setString("python.execution-mode", "thread")
 
+    def test_metrics(self):
+        ds = self.env.from_collection(
+            [('ab', 'a', decimal.Decimal(1)),
+             ('bdc', 'a', decimal.Decimal(2)),
+             ('cfgs', 'a', decimal.Decimal(3)),
+             ('deeefg', 'a', decimal.Decimal(4))],
+            type_info=Types.TUPLE(
+                [Types.STRING(), Types.STRING(), Types.BIG_DEC()]))
+
+        class MyMapFunction(MapFunction):
+            def __init__(self):
+                self.counter = None  # type: Counter
+                self.counter_value = 0
+                self.meter = None  # type: Meter
+                self.meter_value = 0
+                self.value_to_expose = 0
+                self.distribution = None  # type: Distribution
+
+            def open(self, runtime_context: RuntimeContext):
+                self.counter = runtime_context.get_metrics_group().counter("my_counter")
+                self.meter = runtime_context.get_metrics_group().meter('my_meter', 1)
+                runtime_context.get_metrics_group().gauge("my_gauge", lambda: self.value_to_expose)
+                self.distribution = runtime_context.get_metrics_group().distribution(
+                    "my_distribution")
+
+            def map(self, value):
+                self.counter.inc()
+                self.counter_value += 1
+                assert self.counter.get_count() == self.counter_value
+
+                self.meter.mark_event(1)
+                self.meter_value += 1
+                assert self.meter.get_count() == self.meter_value
+
+                self.value_to_expose += 1
+
+                self.distribution.update(int(value[2]))
+
+                return Row(value[0], len(value[0]), value[2])
+
+        (ds.key_by(lambda value: value[1])
+         .map(MyMapFunction(),
+              output_type=Types.ROW([Types.STRING(), Types.INT(), Types.BIG_DEC()]))
+         .add_sink(self.test_sink))
+        self.env.execute('test_basic_operations')
+        results = self.test_sink.get_results()
+        expected = ['+I[ab, 2, 1]', '+I[bdc, 3, 2]', '+I[cfgs, 4, 3]', '+I[deeefg, 6, 4]']
+        self.assert_equals_sorted(expected, results)
+
 
 @pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7")
 class EmbeddedDataStreamBatchTests(DataStreamBatchTests, PyFlinkBatchTestCase):
diff --git a/flink-python/pyflink/fn_execution/datastream/embedded/runtime_context.py b/flink-python/pyflink/fn_execution/datastream/embedded/runtime_context.py
index 2d744c06a5f..cb890b4c8a9 100644
--- a/flink-python/pyflink/fn_execution/datastream/embedded/runtime_context.py
+++ b/flink-python/pyflink/fn_execution/datastream/embedded/runtime_context.py
@@ -21,6 +21,8 @@ from pyflink.datastream.state import (AggregatingStateDescriptor, AggregatingSta
                                       MapState, ListStateDescriptor, ListState,
                                       ValueStateDescriptor, ValueState)
 from pyflink.fn_execution.embedded.state_impl import KeyedStateBackend
+from pyflink.fn_execution.metrics.embedded.metric_impl import MetricGroupImpl
+from pyflink.metrics import MetricGroup
 
 
 class StreamingRuntimeContext(RuntimeContext):
@@ -75,8 +77,8 @@ class StreamingRuntimeContext(RuntimeContext):
         """
         return self._job_parameters[key] if key in self._job_parameters else default_value
 
-    def get_metrics_group(self):
-        return self._runtime_context.getMetricGroup()
+    def get_metrics_group(self) -> MetricGroup:
+        return MetricGroupImpl(self._runtime_context.getMetricGroup())
 
     def get_state(self, state_descriptor: ValueStateDescriptor) -> ValueState:
         return self._keyed_state_backend.get_value_state(state_descriptor)
diff --git a/flink-python/pyflink/fn_execution/datastream/process/operations.py b/flink-python/pyflink/fn_execution/datastream/process/operations.py
index 962daf2df70..13258180238 100644
--- a/flink-python/pyflink/fn_execution/datastream/process/operations.py
+++ b/flink-python/pyflink/fn_execution/datastream/process/operations.py
@@ -47,7 +47,7 @@ from pyflink.fn_execution.datastream.process.timerservice_impl import (
     NonKeyedTimerServiceImpl,
 )
 from pyflink.fn_execution.datastream.window.window_operator import WindowOperator
-from pyflink.metrics.metricbase import GenericMetricGroup
+from pyflink.fn_execution.metrics.process.metric_impl import GenericMetricGroup
 
 
 class Operation(operations.OneInputOperation, abc.ABC):
diff --git a/flink-python/pyflink/metrics/tests/__init__.py b/flink-python/pyflink/fn_execution/metrics/__init__.py
similarity index 100%
copy from flink-python/pyflink/metrics/tests/__init__.py
copy to flink-python/pyflink/fn_execution/metrics/__init__.py
diff --git a/flink-python/pyflink/metrics/tests/__init__.py b/flink-python/pyflink/fn_execution/metrics/embedded/__init__.py
similarity index 100%
copy from flink-python/pyflink/metrics/tests/__init__.py
copy to flink-python/pyflink/fn_execution/metrics/embedded/__init__.py
diff --git a/flink-python/pyflink/metrics/tests/__init__.py b/flink-python/pyflink/fn_execution/metrics/embedded/counter_impl.py
similarity index 63%
copy from flink-python/pyflink/metrics/tests/__init__.py
copy to flink-python/pyflink/fn_execution/metrics/embedded/counter_impl.py
index 65b48d4d79b..7950998ebfb 100644
--- a/flink-python/pyflink/metrics/tests/__init__.py
+++ b/flink-python/pyflink/fn_execution/metrics/embedded/counter_impl.py
@@ -15,3 +15,27 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
+from pyflink.metrics import Counter
+
+
+class CounterImpl(Counter):
+    def __init__(self, inner_counter):
+        self._inner_counter = inner_counter
+
+    def inc(self, n: int = 1):
+        """
+        Increment the current count by the given value.
+        """
+        self._inner_counter.inc(n)
+
+    def dec(self, n: int = 1):
+        """
+        Decrement the current count by 1.
+        """
+        self.inc(-n)
+
+    def get_count(self) -> int:
+        """
+        Returns the current count.
+        """
+        return self._inner_counter.getCount()
diff --git a/flink-python/pyflink/metrics/tests/__init__.py b/flink-python/pyflink/fn_execution/metrics/embedded/distribution_impl.py
similarity index 74%
copy from flink-python/pyflink/metrics/tests/__init__.py
copy to flink-python/pyflink/fn_execution/metrics/embedded/distribution_impl.py
index 65b48d4d79b..bf826772052 100644
--- a/flink-python/pyflink/metrics/tests/__init__.py
+++ b/flink-python/pyflink/fn_execution/metrics/embedded/distribution_impl.py
@@ -15,3 +15,15 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
+from pyflink.metrics import Distribution
+
+
+class DistributionImpl(Distribution):
+    def __init__(self, inner_distribution):
+        self._inner_distribution = inner_distribution
+
+    def update(self, value):
+        """
+        Updates the distribution value.
+        """
+        self._inner_distribution.update(value)
diff --git a/flink-python/pyflink/metrics/tests/__init__.py b/flink-python/pyflink/fn_execution/metrics/embedded/meter_impl.py
similarity index 67%
copy from flink-python/pyflink/metrics/tests/__init__.py
copy to flink-python/pyflink/fn_execution/metrics/embedded/meter_impl.py
index 65b48d4d79b..199f105e685 100644
--- a/flink-python/pyflink/metrics/tests/__init__.py
+++ b/flink-python/pyflink/fn_execution/metrics/embedded/meter_impl.py
@@ -15,3 +15,22 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
+from pyflink.metrics import Meter
+
+
+class MeterImpl(Meter):
+
+    def __init__(self, inner_counter):
+        self._inner_counter = inner_counter
+
+    def mark_event(self, value: int = 1):
+        """
+        Mark occurrence of the specified number of events.
+        """
+        self._inner_counter.markEvent(value)
+
+    def get_count(self) -> int:
+        """
+        Get number of events marked on the meter.
+        """
+        return self._inner_counter.getCount()
diff --git a/flink-python/pyflink/fn_execution/metrics/embedded/metric_impl.py b/flink-python/pyflink/fn_execution/metrics/embedded/metric_impl.py
new file mode 100644
index 00000000000..9081eb029cd
--- /dev/null
+++ b/flink-python/pyflink/fn_execution/metrics/embedded/metric_impl.py
@@ -0,0 +1,61 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Callable
+
+from pemja import findClass
+
+from pyflink.fn_execution.metrics.embedded.counter_impl import CounterImpl
+from pyflink.fn_execution.metrics.embedded.distribution_impl import DistributionImpl
+from pyflink.fn_execution.metrics.embedded.meter_impl import MeterImpl
+from pyflink.metrics import MetricGroup, Counter, Distribution, Meter
+
+JMeterView = findClass('org.apache.flink.metrics.MeterView')
+JMetricGauge = findClass('org.apache.flink.python.metric.embedded.MetricGauge')
+JMetricDistribution = findClass('org.apache.flink.python.metric.embedded.MetricDistribution')
+
+
+class MetricGroupImpl(MetricGroup):
+
+    def __init__(self, metrics):
+        self._metrics = metrics
+
+    def add_group(self, name: str, extra: str = None) -> 'MetricGroup':
+        if extra is None:
+            return MetricGroupImpl(self._metrics.addGroup(name))
+        else:
+            return MetricGroupImpl(self._metrics.addGroup(name, extra))
+
+    def counter(self, name: str) -> 'Counter':
+        return CounterImpl(self._metrics.counter(name))
+
+    def gauge(self, name: str, obj: Callable[[], int]) -> None:
+        self._metrics.gauge(name, JMetricGauge(PythonGaugeCallable(obj)))
+
+    def meter(self, name: str, time_span_in_seconds: int = 60) -> 'Meter':
+        return MeterImpl(self._metrics.meter(name, JMeterView(time_span_in_seconds)))
+
+    def distribution(self, name: str) -> 'Distribution':
+        return DistributionImpl(self._metrics.gauge(name, JMetricDistribution()))
+
+
+class PythonGaugeCallable(object):
+    def __init__(self, func: Callable):
+        self.func = func
+
+    def get_value(self):
+        return self.func()
diff --git a/flink-python/pyflink/metrics/tests/__init__.py b/flink-python/pyflink/fn_execution/metrics/process/__init__.py
similarity index 100%
copy from flink-python/pyflink/metrics/tests/__init__.py
copy to flink-python/pyflink/fn_execution/metrics/process/__init__.py
diff --git a/flink-python/dev/integration_test.sh b/flink-python/pyflink/fn_execution/metrics/process/counter_impl.py
old mode 100755
new mode 100644
similarity index 54%
copy from flink-python/dev/integration_test.sh
copy to flink-python/pyflink/fn_execution/metrics/process/counter_impl.py
index c358632f217..e0364f98cf1
--- a/flink-python/dev/integration_test.sh
+++ b/flink-python/pyflink/fn_execution/metrics/process/counter_impl.py
@@ -1,4 +1,3 @@
-#!/usr/bin/env bash
 ################################################################################
 #  Licensed to the Apache Software Foundation (ASF) under one
 #  or more contributor license agreements.  See the NOTICE file
@@ -16,34 +15,35 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
+from pyflink.metrics import Counter
 
-function test_module() {
-    module="$FLINK_PYTHON_DIR/pyflink/$1"
-    echo "test module $module"
-    pytest --durations=20 ${module} $2
-    if [[ $? -ne 0 ]]; then
-        echo "test module $module failed"
-        exit 1
-    fi
-}
 
-# CURRENT_DIR is "flink/flink-python/dev/"
-CURRENT_DIR="$(cd "$( dirname "$0" )" && pwd)"
+class CounterImpl(Counter):
+    def __init__(self, inner_counter):
+        self._inner_counter = inner_counter
 
-# FLINK_PYTHON_DIR is "flink/flink-python"
-FLINK_PYTHON_DIR=$(dirname "$CURRENT_DIR")
+    def inc(self, n: int = 1):
+        """
+        Increment the current count by the given value.
 
-# test common module
-test_module "common"
+        .. versionadded:: 1.11.0
+        """
+        self._inner_counter.inc(n)
 
-# test datastream module
-test_module "datastream"
+    def dec(self, n: int = 1):
+        """
+        Decrement the current count by 1.
 
-# test fn_execution module
-test_module "fn_execution"
+        .. versionadded:: 1.11.0
+        """
+        self.inc(-n)
 
-# test metrics module
-test_module "metrics"
+    def get_count(self) -> int:
+        """
+        Returns the current count.
 
-# test table module
-test_module "table"
+        .. versionadded:: 1.11.0
+        """
+        from apache_beam.metrics.execution import MetricsEnvironment
+        container = MetricsEnvironment.current_container()
+        return container.get_counter(self._inner_counter.metric_name).get_cumulative()
diff --git a/flink-python/pyflink/metrics/tests/__init__.py b/flink-python/pyflink/fn_execution/metrics/process/distribution_impl.py
similarity index 73%
copy from flink-python/pyflink/metrics/tests/__init__.py
copy to flink-python/pyflink/fn_execution/metrics/process/distribution_impl.py
index 65b48d4d79b..5a5273cb983 100644
--- a/flink-python/pyflink/metrics/tests/__init__.py
+++ b/flink-python/pyflink/fn_execution/metrics/process/distribution_impl.py
@@ -15,3 +15,17 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
+from pyflink.metrics import Distribution
+
+
+class DistributionImpl(Distribution):
+    def __init__(self, inner_distribution):
+        self._inner_distribution = inner_distribution
+
+    def update(self, value):
+        """
+        Updates the distribution value.
+
+        .. versionadded:: 1.11.0
+        """
+        self._inner_distribution.update(value)
diff --git a/flink-python/dev/integration_test.sh b/flink-python/pyflink/fn_execution/metrics/process/meter_impl.py
old mode 100755
new mode 100644
similarity index 58%
copy from flink-python/dev/integration_test.sh
copy to flink-python/pyflink/fn_execution/metrics/process/meter_impl.py
index c358632f217..3ee0856d5bf
--- a/flink-python/dev/integration_test.sh
+++ b/flink-python/pyflink/fn_execution/metrics/process/meter_impl.py
@@ -1,4 +1,3 @@
-#!/usr/bin/env bash
 ################################################################################
 #  Licensed to the Apache Software Foundation (ASF) under one
 #  or more contributor license agreements.  See the NOTICE file
@@ -16,34 +15,28 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
+from pyflink.metrics import Meter
 
-function test_module() {
-    module="$FLINK_PYTHON_DIR/pyflink/$1"
-    echo "test module $module"
-    pytest --durations=20 ${module} $2
-    if [[ $? -ne 0 ]]; then
-        echo "test module $module failed"
-        exit 1
-    fi
-}
 
-# CURRENT_DIR is "flink/flink-python/dev/"
-CURRENT_DIR="$(cd "$( dirname "$0" )" && pwd)"
+class MeterImpl(Meter):
 
-# FLINK_PYTHON_DIR is "flink/flink-python"
-FLINK_PYTHON_DIR=$(dirname "$CURRENT_DIR")
+    def __init__(self, inner_counter):
+        self._inner_counter = inner_counter
 
-# test common module
-test_module "common"
+    def mark_event(self, value: int = 1):
+        """
+        Mark occurrence of the specified number of events.
 
-# test datastream module
-test_module "datastream"
+        .. versionadded:: 1.11.0
+        """
+        self._inner_counter.inc(value)
 
-# test fn_execution module
-test_module "fn_execution"
+    def get_count(self) -> int:
+        """
+        Get number of events marked on the meter.
 
-# test metrics module
-test_module "metrics"
-
-# test table module
-test_module "table"
+        .. versionadded:: 1.11.0
+        """
+        from apache_beam.metrics.execution import MetricsEnvironment
+        container = MetricsEnvironment.current_container()
+        return container.get_counter(self._inner_counter.metric_name).get_cumulative()
diff --git a/flink-python/pyflink/fn_execution/metrics/process/metric_impl.py b/flink-python/pyflink/fn_execution/metrics/process/metric_impl.py
new file mode 100644
index 00000000000..c48dfbfcc37
--- /dev/null
+++ b/flink-python/pyflink/fn_execution/metrics/process/metric_impl.py
@@ -0,0 +1,103 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import json
+from enum import Enum
+from typing import Callable, List, Tuple
+
+from pyflink.fn_execution.metrics.process.counter_impl import CounterImpl
+from pyflink.fn_execution.metrics.process.distribution_impl import DistributionImpl
+from pyflink.fn_execution.metrics.process.meter_impl import MeterImpl
+from pyflink.metrics import MetricGroup, Counter, Distribution, Meter
+
+
+class MetricGroupType(Enum):
+    """
+    Indicate the type of MetricGroup.
+    """
+    generic = 0
+    key = 1
+    value = 2
+
+
+class GenericMetricGroup(MetricGroup):
+
+    def __init__(
+            self,
+            parent,
+            name,
+            metric_group_type=MetricGroupType.generic):
+        self._parent = parent
+        self._sub_groups = []
+        self._name = name
+        self._metric_group_type = metric_group_type
+        self._flink_gauge = {}
+        self._beam_gauge = {}
+
+    def add_group(self, name: str, extra: str = None) -> 'MetricGroup':
+        if extra is None:
+            return self._add_group(name, MetricGroupType.generic)
+        else:
+            return self._add_group(name, MetricGroupType.key) \
+                ._add_group(extra, MetricGroupType.value)
+
+    def counter(self, name: str) -> 'Counter':
+        from apache_beam.metrics.metric import Metrics
+        return CounterImpl(Metrics.counter(self._get_namespace(), name))
+
+    def gauge(self, name: str, obj: Callable[[], int]) -> None:
+        from apache_beam.metrics.metric import Metrics
+        self._flink_gauge[name] = obj
+        self._beam_gauge[name] = Metrics.gauge(self._get_namespace(), name)
+
+    def meter(self, name: str, time_span_in_seconds: int = 60) -> 'Meter':
+        from apache_beam.metrics.metric import Metrics
+        # There is no meter type in Beam, use counter to implement meter
+        return MeterImpl(Metrics.counter(self._get_namespace(time_span_in_seconds), name))
+
+    def distribution(self, name: str) -> 'Distribution':
+        from apache_beam.metrics.metric import Metrics
+        return DistributionImpl(Metrics.distribution(self._get_namespace(), name))
+
+    def _add_group(self, name: str, metric_group_type: MetricGroupType) -> 'GenericMetricGroup':
+        for group in self._sub_groups:
+            if name == group._name and metric_group_type == group._metric_group_type:
+                # we don't create same metric group repeatedly
+                return group
+
+        sub_group = GenericMetricGroup(
+            self,
+            name,
+            metric_group_type)
+        self._sub_groups.append(sub_group)
+        return sub_group
+
+    def _get_metric_group_names_and_types(self) -> Tuple[List[str], List[str]]:
+        if self._name is None:
+            return [], []
+        else:
+            names, types = self._parent._get_metric_group_names_and_types()
+            names.append(self._name)
+            types.append(str(self._metric_group_type))
+            return names, types
+
+    def _get_namespace(self, time=None) -> str:
+        names, metric_group_type = self._get_metric_group_names_and_types()
+        names.extend(metric_group_type)
+        if time is not None:
+            names.append(str(time))
+        return json.dumps(names)
diff --git a/flink-python/pyflink/metrics/tests/__init__.py b/flink-python/pyflink/fn_execution/metrics/tests/__init__.py
similarity index 100%
rename from flink-python/pyflink/metrics/tests/__init__.py
rename to flink-python/pyflink/fn_execution/metrics/tests/__init__.py
diff --git a/flink-python/pyflink/metrics/tests/test_metric.py b/flink-python/pyflink/fn_execution/metrics/tests/test_metric.py
similarity index 96%
rename from flink-python/pyflink/metrics/tests/test_metric.py
rename to flink-python/pyflink/fn_execution/metrics/tests/test_metric.py
index 543ed4f2154..ea1f4eb635d 100644
--- a/flink-python/pyflink/metrics/tests/test_metric.py
+++ b/flink-python/pyflink/fn_execution/metrics/tests/test_metric.py
@@ -15,17 +15,16 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-
-from pyflink.metrics.metricbase import GenericMetricGroup, MetricGroup
-from pyflink.table import FunctionContext
-
-from apache_beam.runners.worker import statesampler
-from apache_beam.utils import counters
+from apache_beam.metrics.cells import DistributionData
 from apache_beam.metrics.execution import MetricsContainer
 from apache_beam.metrics.execution import MetricsEnvironment
 from apache_beam.metrics.metricbase import MetricName
-from apache_beam.metrics.cells import DistributionData
+from apache_beam.runners.worker import statesampler
+from apache_beam.utils import counters
 
+from pyflink.fn_execution.metrics.process.metric_impl import GenericMetricGroup
+from pyflink.metrics.metricbase import MetricGroup
+from pyflink.table import FunctionContext
 from pyflink.testing.test_case_utils import PyFlinkTestCase
 
 
diff --git a/flink-python/pyflink/fn_execution/table/operations.py b/flink-python/pyflink/fn_execution/table/operations.py
index 3a77f25497d..4534d3a6849 100644
--- a/flink-python/pyflink/fn_execution/table/operations.py
+++ b/flink-python/pyflink/fn_execution/table/operations.py
@@ -34,7 +34,7 @@ from pyflink.fn_execution.table.window_trigger import EventTimeTrigger, Processi
     CountTrigger
 from pyflink.fn_execution.utils import operation_utils
 from pyflink.fn_execution.utils.operation_utils import extract_user_defined_aggregate_function
-from pyflink.metrics.metricbase import GenericMetricGroup
+from pyflink.fn_execution.metrics.process.metric_impl import GenericMetricGroup
 
 try:
     from pyflink.fn_execution.table.aggregate_fast import RowKeySelector, \
diff --git a/flink-python/pyflink/metrics/metricbase.py b/flink-python/pyflink/metrics/metricbase.py
index c1d1d400d10..985f0447964 100644
--- a/flink-python/pyflink/metrics/metricbase.py
+++ b/flink-python/pyflink/metrics/metricbase.py
@@ -15,13 +15,11 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-import abc
-import json
-from enum import Enum
-from typing import Callable, Tuple, List
+from abc import ABC, abstractmethod
+from typing import Callable
 
 
-class MetricGroup(abc.ABC):
+class MetricGroup(ABC):
     """
     A MetricGroup is a named container for metrics and further metric subgroups.
 
@@ -33,6 +31,7 @@ class MetricGroup(abc.ABC):
     .. versionadded:: 1.11.0
     """
 
+    @abstractmethod
     def add_group(self, name: str, extra: str = None) -> 'MetricGroup':
         """
         Creates a new MetricGroup and adds it to this groups sub-groups.
@@ -46,6 +45,7 @@ class MetricGroup(abc.ABC):
         """
         pass
 
+    @abstractmethod
     def counter(self, name: str) -> 'Counter':
         """
         Registers a new `Counter` with Flink.
@@ -54,6 +54,7 @@ class MetricGroup(abc.ABC):
         """
         pass
 
+    @abstractmethod
     def gauge(self, name: str, obj: Callable[[], int]) -> None:
         """
         Registers a new `Gauge` with Flink.
@@ -62,15 +63,16 @@ class MetricGroup(abc.ABC):
         """
         pass
 
+    @abstractmethod
     def meter(self, name: str, time_span_in_seconds: int = 60) -> 'Meter':
         """
         Registers a new `Meter` with Flink.
 
         .. versionadded:: 1.11.0
         """
-        # There is no meter type in Beam, use counter to implement meter
         pass
 
+    @abstractmethod
     def distribution(self, name: str) -> 'Distribution':
         """
         Registers a new `Distribution` with Flink.
@@ -80,86 +82,7 @@ class MetricGroup(abc.ABC):
         pass
 
 
-class MetricGroupType(Enum):
-    """
-    Indicate the type of MetricGroup.
-    """
-    generic = 0
-    key = 1
-    value = 2
-
-
-class GenericMetricGroup(MetricGroup):
-
-    def __init__(
-            self,
-            parent,
-            name,
-            metric_group_type=MetricGroupType.generic):
-        self._parent = parent
-        self._sub_groups = []
-        self._name = name
-        self._metric_group_type = metric_group_type
-        self._flink_gauge = {}
-        self._beam_gauge = {}
-
-    def _add_group(self, name: str, metric_group_type: MetricGroupType) \
-            -> 'GenericMetricGroup':
-        for group in self._sub_groups:
-            if name == group._name and metric_group_type == group._metric_group_type:
-                # we don't create same metric group repeatedly
-                return group
-
-        sub_group = GenericMetricGroup(
-            self,
-            name,
-            metric_group_type)
-        self._sub_groups.append(sub_group)
-        return sub_group
-
-    def add_group(self, name: str, extra: str = None) -> 'MetricGroup':
-        if extra is None:
-            return self._add_group(name, MetricGroupType.generic)
-        else:
-            return self._add_group(name, MetricGroupType.key)\
-                ._add_group(extra, MetricGroupType.value)
-
-    def counter(self, name: str) -> 'Counter':
-        from apache_beam.metrics.metric import Metrics
-        return Counter(Metrics.counter(self._get_namespace(), name))
-
-    def gauge(self, name: str, obj: Callable[[], int]) -> None:
-        from apache_beam.metrics.metric import Metrics
-        self._flink_gauge[name] = obj
-        self._beam_gauge[name] = Metrics.gauge(self._get_namespace(), name)
-
-    def meter(self, name: str, time_span_in_seconds: int = 60) -> 'Meter':
-        from apache_beam.metrics.metric import Metrics
-        # There is no meter type in Beam, use counter to implement meter
-        return Meter(Metrics.counter(self._get_namespace(time_span_in_seconds), name))
-
-    def distribution(self, name: str) -> 'Distribution':
-        from apache_beam.metrics.metric import Metrics
-        return Distribution(Metrics.distribution(self._get_namespace(), name))
-
-    def _get_metric_group_names_and_types(self) -> Tuple[List[str], List[str]]:
-        if self._name is None:
-            return [], []
-        else:
-            names, types = self._parent._get_metric_group_names_and_types()
-            names.append(self._name)
-            types.append(str(self._metric_group_type))
-            return names, types
-
-    def _get_namespace(self, time=None) -> str:
-        names, metric_group_type = self._get_metric_group_names_and_types()
-        names.extend(metric_group_type)
-        if time is not None:
-            names.append(str(time))
-        return json.dumps(names)
-
-
-class Metric(object):
+class Metric(ABC):
     """
     Base interface of a metric object.
 
@@ -168,7 +91,7 @@ class Metric(object):
     pass
 
 
-class Counter(Metric):
+class Counter(Metric, ABC):
     """
     Counter metric interface. Allows a count to be incremented/decremented
     during pipeline execution.
@@ -176,59 +99,35 @@ class Counter(Metric):
     .. versionadded:: 1.11.0
     """
 
-    def __init__(self, inner_counter):
-        self._inner_counter = inner_counter
-
+    @abstractmethod
     def inc(self, n: int = 1):
         """
         Increment the current count by the given value.
 
         .. versionadded:: 1.11.0
         """
-        self._inner_counter.inc(n)
+        pass
 
+    @abstractmethod
     def dec(self, n: int = 1):
         """
         Decrement the current count by 1.
 
         .. versionadded:: 1.11.0
         """
-        self.inc(-n)
+        pass
 
+    @abstractmethod
     def get_count(self) -> int:
         """
         Returns the current count.
 
         .. versionadded:: 1.11.0
         """
-        from apache_beam.metrics.execution import MetricsEnvironment
-        container = MetricsEnvironment.current_container()
-        return container.get_counter(self._inner_counter.metric_name).get_cumulative()
-
-
-class Distribution(Metric):
-    """
-    Distribution Metric interface.
-
-    Allows statistics about the distribution of a variable to be collected during
-    pipeline execution.
-
-    .. versionadded:: 1.11.0
-    """
-
-    def __init__(self, inner_distribution):
-        self._inner_distribution = inner_distribution
-
-    def update(self, value):
-        """
-        Updates the distribution value.
-
-        .. versionadded:: 1.11.0
-        """
-        self._inner_distribution.update(value)
+        pass
 
 
-class Meter(Metric):
+class Meter(Metric, ABC):
     """
     Meter Metric interface.
 
@@ -237,23 +136,35 @@ class Meter(Metric):
     .. versionadded:: 1.11.0
     """
 
-    def __init__(self, inner_counter):
-        self._inner_counter = inner_counter
-
+    @abstractmethod
     def mark_event(self, value: int = 1):
         """
         Mark occurrence of the specified number of events.
 
         .. versionadded:: 1.11.0
         """
-        self._inner_counter.inc(value)
+        pass
 
+    @abstractmethod
     def get_count(self) -> int:
         """
         Get number of events marked on the meter.
 
         .. versionadded:: 1.11.0
         """
-        from apache_beam.metrics.execution import MetricsEnvironment
-        container = MetricsEnvironment.current_container()
-        return container.get_counter(self._inner_counter.metric_name).get_cumulative()
+        pass
+
+
+class Distribution(Metric, ABC):
+    """
+    Distribution Metric interface.
+
+    Allows statistics about the distribution of a variable to be collected during
+    pipeline execution.
+
+    .. versionadded:: 1.11.0
+    """
+
+    @abstractmethod
+    def update(self, value):
+        pass
diff --git a/flink-python/setup.py b/flink-python/setup.py
index 82e5e9b4488..e524c369256 100644
--- a/flink-python/setup.py
+++ b/flink-python/setup.py
@@ -283,6 +283,9 @@ try:
                 'pyflink.fn_execution.datastream.window',
                 'pyflink.fn_execution.embedded',
                 'pyflink.fn_execution.formats',
+                'pyflink.fn_execution.metrics',
+                'pyflink.fn_execution.metrics.embedded',
+                'pyflink.fn_execution.metrics.process',
                 'pyflink.fn_execution.table',
                 'pyflink.fn_execution.utils',
                 'pyflink.metrics',
diff --git a/flink-python/src/main/java/org/apache/flink/python/metric/embedded/MetricDistribution.java b/flink-python/src/main/java/org/apache/flink/python/metric/embedded/MetricDistribution.java
new file mode 100644
index 00000000000..5cd13889759
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/python/metric/embedded/MetricDistribution.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.metric.embedded;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Gauge;
+
+/** Flink {@link Gauge} for Python Distribution. */
+@Internal
+public class MetricDistribution implements Gauge<Long> {
+
+    private long value;
+
+    public void update(long value) {
+        this.value = value;
+    }
+
+    @Override
+    public Long getValue() {
+        return value;
+    }
+}
diff --git a/flink-python/src/main/java/org/apache/flink/python/metric/embedded/MetricGauge.java b/flink-python/src/main/java/org/apache/flink/python/metric/embedded/MetricGauge.java
new file mode 100644
index 00000000000..afa4d683a4c
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/python/metric/embedded/MetricGauge.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.metric.embedded;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Gauge;
+
+import pemja.core.object.PyObject;
+
+/** Flink {@link Gauge} for Python Gauge. */
+@Internal
+public class MetricGauge implements Gauge<Long> {
+
+    private final PyObject callable;
+
+    public MetricGauge(PyObject callable) {
+        this.callable = callable;
+    }
+
+    @Override
+    public Long getValue() {
+        return (Long) callable.invokeMethod("get_value");
+    }
+}
diff --git a/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java b/flink-python/src/main/java/org/apache/flink/python/metric/process/FlinkMetricContainer.java
similarity index 99%
rename from flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java
rename to flink-python/src/main/java/org/apache/flink/python/metric/process/FlinkMetricContainer.java
index df2f84a283b..4475ca1af6b 100644
--- a/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java
+++ b/flink-python/src/main/java/org/apache/flink/python/metric/process/FlinkMetricContainer.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.python.metric;
+package org.apache.flink.python.metric.process;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.GlobalConfiguration;
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index e4af2f405dd..2c8ee0ea4d6 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.api.operators.python;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.python.env.PythonEnvironmentManager;
-import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.python.metric.process.FlinkMetricContainer;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
index b090ca98ab6..38644c79cb7 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
-import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.python.metric.process.FlinkMetricContainer;
 import org.apache.flink.python.util.ProtoUtils;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.state.KeyedStateBackend;
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
index 20b3d45a3f8..474b633b24d 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
@@ -30,7 +30,7 @@ import org.apache.flink.python.PythonOptions;
 import org.apache.flink.python.env.PythonEnvironment;
 import org.apache.flink.python.env.process.ProcessPythonEnvironment;
 import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
-import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.python.metric.process.FlinkMetricContainer;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.memory.OpaqueMemoryResource;
 import org.apache.flink.runtime.state.KeyedStateBackend;
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java
index 888c3c4f9be..ac340616ce2 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
-import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.python.metric.process.FlinkMetricContainer;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner;
diff --git a/flink-python/src/test/java/org/apache/flink/python/metric/FlinkMetricContainerTest.java b/flink-python/src/test/java/org/apache/flink/python/metric/process/FlinkMetricContainerTest.java
similarity index 99%
rename from flink-python/src/test/java/org/apache/flink/python/metric/FlinkMetricContainerTest.java
rename to flink-python/src/test/java/org/apache/flink/python/metric/process/FlinkMetricContainerTest.java
index a48bdd51f56..9645eac91fb 100644
--- a/flink-python/src/test/java/org/apache/flink/python/metric/FlinkMetricContainerTest.java
+++ b/flink-python/src/test/java/org/apache/flink/python/metric/process/FlinkMetricContainerTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.python.metric;
+package org.apache.flink.python.metric.process;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.metrics.Meter;
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java
index 2e9abd43fe3..a2ef65b7f5f 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java
@@ -26,7 +26,7 @@ import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
-import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.python.metric.process.FlinkMetricContainer;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer;
 import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
index 661805a570a..599fb9d511b 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.runtime.utils;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
-import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.python.metric.process.FlinkMetricContainer;
 import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
 import org.apache.flink.table.types.logical.RowType;
 
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
index e7bca6e3032..2125a1fd1b3 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.runtime.utils;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
-import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.python.metric.process.FlinkMetricContainer;
 import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
 import org.apache.flink.table.types.logical.RowType;
 
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java
index e4a75d52f26..7c72e623bfa 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
-import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.python.metric.process.FlinkMetricContainer;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
 import org.apache.flink.table.types.logical.RowType;
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java
index 4ab78479282..39581f170d0 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.runtime.utils;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
-import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.python.metric.process.FlinkMetricContainer;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.table.runtime.operators.python.aggregate.PassThroughPythonStreamGroupWindowAggregateOperator;
 import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java
index 28c24170281..f8b79db3a80 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
-import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.python.metric.process.FlinkMetricContainer;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
 import org.apache.flink.table.types.logical.RowType;
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PythonTestUtils.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PythonTestUtils.java
index 932d9fa9dfa..9054e71a0c5 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PythonTestUtils.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PythonTestUtils.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.runtime.utils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.python.env.PythonDependencyInfo;
 import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
-import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.python.metric.process.FlinkMetricContainer;
 import org.apache.flink.python.util.PythonEnvironmentManagerUtils;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.metrics.groups.GenericMetricGroup;