You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hx...@apache.org on 2022/08/11 12:39:10 UTC
[flink] branch master updated: [FLINK-28887][python] Fix the bug of custom metrics in Thread Mode
This is an automated email from the ASF dual-hosted git repository.
hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4ebb787ff35 [FLINK-28887][python] Fix the bug of custom metrics in Thread Mode
4ebb787ff35 is described below
commit 4ebb787ff354e5c54ea4c55d712bab6220d9ed55
Author: huangxingbo <hx...@apache.org>
AuthorDate: Thu Aug 11 10:35:37 2022 +0800
[FLINK-28887][python] Fix the bug of custom metrics in Thread Mode
This closes #20540.
---
flink-python/dev/integration_test.sh | 3 -
.../pyflink/datastream/tests/test_data_stream.py | 50 +++++++
.../datastream/embedded/runtime_context.py | 6 +-
.../fn_execution/datastream/process/operations.py | 2 +-
.../tests => fn_execution/metrics}/__init__.py | 0
.../metrics/embedded}/__init__.py | 0
.../metrics/embedded/counter_impl.py} | 24 +++
.../metrics/embedded/distribution_impl.py} | 12 ++
.../metrics/embedded/meter_impl.py} | 19 +++
.../fn_execution/metrics/embedded/metric_impl.py | 61 ++++++++
.../metrics/process}/__init__.py | 0
.../fn_execution/metrics/process/counter_impl.py} | 48 +++---
.../metrics/process/distribution_impl.py} | 14 ++
.../fn_execution/metrics/process/meter_impl.py} | 43 +++---
.../fn_execution/metrics/process/metric_impl.py | 103 +++++++++++++
.../{ => fn_execution}/metrics/tests/__init__.py | 0
.../metrics/tests/test_metric.py | 13 +-
.../pyflink/fn_execution/table/operations.py | 2 +-
flink-python/pyflink/metrics/metricbase.py | 161 +++++----------------
flink-python/setup.py | 3 +
.../python/metric/embedded/MetricDistribution.java | 38 +++++
.../flink/python/metric/embedded/MetricGauge.java | 40 +++++
.../metric/{ => process}/FlinkMetricContainer.java | 2 +-
.../python/AbstractPythonFunctionOperator.java | 2 +-
.../beam/BeamDataStreamPythonFunctionRunner.java | 2 +-
.../python/beam/BeamPythonFunctionRunner.java | 2 +-
.../python/beam/BeamTablePythonFunctionRunner.java | 2 +-
.../{ => process}/FlinkMetricContainerTest.java | 2 +-
.../PassThroughPythonAggregateFunctionRunner.java | 2 +-
.../PassThroughPythonScalarFunctionRunner.java | 2 +-
.../PassThroughPythonTableFunctionRunner.java | 2 +-
...ThroughStreamAggregatePythonFunctionRunner.java | 2 +-
...amGroupWindowAggregatePythonFunctionRunner.java | 2 +-
...ghStreamTableAggregatePythonFunctionRunner.java | 2 +-
.../flink/table/runtime/utils/PythonTestUtils.java | 2 +-
35 files changed, 467 insertions(+), 201 deletions(-)
diff --git a/flink-python/dev/integration_test.sh b/flink-python/dev/integration_test.sh
index c358632f217..bb868468f4f 100755
--- a/flink-python/dev/integration_test.sh
+++ b/flink-python/dev/integration_test.sh
@@ -42,8 +42,5 @@ test_module "datastream"
# test fn_execution module
test_module "fn_execution"
-# test metrics module
-test_module "metrics"
-
# test table module
test_module "table"
diff --git a/flink-python/pyflink/datastream/tests/test_data_stream.py b/flink-python/pyflink/datastream/tests/test_data_stream.py
index 79165d40490..7380be28b0f 100644
--- a/flink-python/pyflink/datastream/tests/test_data_stream.py
+++ b/flink-python/pyflink/datastream/tests/test_data_stream.py
@@ -42,6 +42,7 @@ from pyflink.datastream.state import (ValueStateDescriptor, ListStateDescriptor,
AggregatingStateDescriptor, StateTtlConfig)
from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
from pyflink.java_gateway import get_gateway
+from pyflink.metrics import Counter, Meter, Distribution
from pyflink.testing.test_case_utils import PyFlinkBatchTestCase, PyFlinkStreamingTestCase
from pyflink.util.java_utils import get_j_env_configuration
@@ -1718,6 +1719,55 @@ class EmbeddedDataStreamStreamTests(DataStreamStreamingTests, PyFlinkStreamingTe
config = get_j_env_configuration(self.env._j_stream_execution_environment)
config.setString("python.execution-mode", "thread")
+ def test_metrics(self):
+ ds = self.env.from_collection(
+ [('ab', 'a', decimal.Decimal(1)),
+ ('bdc', 'a', decimal.Decimal(2)),
+ ('cfgs', 'a', decimal.Decimal(3)),
+ ('deeefg', 'a', decimal.Decimal(4))],
+ type_info=Types.TUPLE(
+ [Types.STRING(), Types.STRING(), Types.BIG_DEC()]))
+
+ class MyMapFunction(MapFunction):
+ def __init__(self):
+ self.counter = None # type: Counter
+ self.counter_value = 0
+ self.meter = None # type: Meter
+ self.meter_value = 0
+ self.value_to_expose = 0
+ self.distribution = None # type: Distribution
+
+ def open(self, runtime_context: RuntimeContext):
+ self.counter = runtime_context.get_metrics_group().counter("my_counter")
+ self.meter = runtime_context.get_metrics_group().meter('my_meter', 1)
+ runtime_context.get_metrics_group().gauge("my_gauge", lambda: self.value_to_expose)
+ self.distribution = runtime_context.get_metrics_group().distribution(
+ "my_distribution")
+
+ def map(self, value):
+ self.counter.inc()
+ self.counter_value += 1
+ assert self.counter.get_count() == self.counter_value
+
+ self.meter.mark_event(1)
+ self.meter_value += 1
+ assert self.meter.get_count() == self.meter_value
+
+ self.value_to_expose += 1
+
+ self.distribution.update(int(value[2]))
+
+ return Row(value[0], len(value[0]), value[2])
+
+ (ds.key_by(lambda value: value[1])
+ .map(MyMapFunction(),
+ output_type=Types.ROW([Types.STRING(), Types.INT(), Types.BIG_DEC()]))
+ .add_sink(self.test_sink))
+ self.env.execute('test_basic_operations')
+ results = self.test_sink.get_results()
+ expected = ['+I[ab, 2, 1]', '+I[bdc, 3, 2]', '+I[cfgs, 4, 3]', '+I[deeefg, 6, 4]']
+ self.assert_equals_sorted(expected, results)
+
@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7")
class EmbeddedDataStreamBatchTests(DataStreamBatchTests, PyFlinkBatchTestCase):
diff --git a/flink-python/pyflink/fn_execution/datastream/embedded/runtime_context.py b/flink-python/pyflink/fn_execution/datastream/embedded/runtime_context.py
index 2d744c06a5f..cb890b4c8a9 100644
--- a/flink-python/pyflink/fn_execution/datastream/embedded/runtime_context.py
+++ b/flink-python/pyflink/fn_execution/datastream/embedded/runtime_context.py
@@ -21,6 +21,8 @@ from pyflink.datastream.state import (AggregatingStateDescriptor, AggregatingSta
MapState, ListStateDescriptor, ListState,
ValueStateDescriptor, ValueState)
from pyflink.fn_execution.embedded.state_impl import KeyedStateBackend
+from pyflink.fn_execution.metrics.embedded.metric_impl import MetricGroupImpl
+from pyflink.metrics import MetricGroup
class StreamingRuntimeContext(RuntimeContext):
@@ -75,8 +77,8 @@ class StreamingRuntimeContext(RuntimeContext):
"""
return self._job_parameters[key] if key in self._job_parameters else default_value
- def get_metrics_group(self):
- return self._runtime_context.getMetricGroup()
+ def get_metrics_group(self) -> MetricGroup:
+ return MetricGroupImpl(self._runtime_context.getMetricGroup())
def get_state(self, state_descriptor: ValueStateDescriptor) -> ValueState:
return self._keyed_state_backend.get_value_state(state_descriptor)
diff --git a/flink-python/pyflink/fn_execution/datastream/process/operations.py b/flink-python/pyflink/fn_execution/datastream/process/operations.py
index 962daf2df70..13258180238 100644
--- a/flink-python/pyflink/fn_execution/datastream/process/operations.py
+++ b/flink-python/pyflink/fn_execution/datastream/process/operations.py
@@ -47,7 +47,7 @@ from pyflink.fn_execution.datastream.process.timerservice_impl import (
NonKeyedTimerServiceImpl,
)
from pyflink.fn_execution.datastream.window.window_operator import WindowOperator
-from pyflink.metrics.metricbase import GenericMetricGroup
+from pyflink.fn_execution.metrics.process.metric_impl import GenericMetricGroup
class Operation(operations.OneInputOperation, abc.ABC):
diff --git a/flink-python/pyflink/metrics/tests/__init__.py b/flink-python/pyflink/fn_execution/metrics/__init__.py
similarity index 100%
copy from flink-python/pyflink/metrics/tests/__init__.py
copy to flink-python/pyflink/fn_execution/metrics/__init__.py
diff --git a/flink-python/pyflink/metrics/tests/__init__.py b/flink-python/pyflink/fn_execution/metrics/embedded/__init__.py
similarity index 100%
copy from flink-python/pyflink/metrics/tests/__init__.py
copy to flink-python/pyflink/fn_execution/metrics/embedded/__init__.py
diff --git a/flink-python/pyflink/metrics/tests/__init__.py b/flink-python/pyflink/fn_execution/metrics/embedded/counter_impl.py
similarity index 63%
copy from flink-python/pyflink/metrics/tests/__init__.py
copy to flink-python/pyflink/fn_execution/metrics/embedded/counter_impl.py
index 65b48d4d79b..7950998ebfb 100644
--- a/flink-python/pyflink/metrics/tests/__init__.py
+++ b/flink-python/pyflink/fn_execution/metrics/embedded/counter_impl.py
@@ -15,3 +15,27 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
+from pyflink.metrics import Counter
+
+
+class CounterImpl(Counter):
+ def __init__(self, inner_counter):
+ self._inner_counter = inner_counter
+
+ def inc(self, n: int = 1):
+ """
+ Increment the current count by the given value.
+ """
+ self._inner_counter.inc(n)
+
+ def dec(self, n: int = 1):
+ """
+ Decrement the current count by 1.
+ """
+ self.inc(-n)
+
+ def get_count(self) -> int:
+ """
+ Returns the current count.
+ """
+ return self._inner_counter.getCount()
diff --git a/flink-python/pyflink/metrics/tests/__init__.py b/flink-python/pyflink/fn_execution/metrics/embedded/distribution_impl.py
similarity index 74%
copy from flink-python/pyflink/metrics/tests/__init__.py
copy to flink-python/pyflink/fn_execution/metrics/embedded/distribution_impl.py
index 65b48d4d79b..bf826772052 100644
--- a/flink-python/pyflink/metrics/tests/__init__.py
+++ b/flink-python/pyflink/fn_execution/metrics/embedded/distribution_impl.py
@@ -15,3 +15,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
+from pyflink.metrics import Distribution
+
+
+class DistributionImpl(Distribution):
+ def __init__(self, inner_distribution):
+ self._inner_distribution = inner_distribution
+
+ def update(self, value):
+ """
+ Updates the distribution value.
+ """
+ self._inner_distribution.update(value)
diff --git a/flink-python/pyflink/metrics/tests/__init__.py b/flink-python/pyflink/fn_execution/metrics/embedded/meter_impl.py
similarity index 67%
copy from flink-python/pyflink/metrics/tests/__init__.py
copy to flink-python/pyflink/fn_execution/metrics/embedded/meter_impl.py
index 65b48d4d79b..199f105e685 100644
--- a/flink-python/pyflink/metrics/tests/__init__.py
+++ b/flink-python/pyflink/fn_execution/metrics/embedded/meter_impl.py
@@ -15,3 +15,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
+from pyflink.metrics import Meter
+
+
+class MeterImpl(Meter):
+
+ def __init__(self, inner_counter):
+ self._inner_counter = inner_counter
+
+ def mark_event(self, value: int = 1):
+ """
+ Mark occurrence of the specified number of events.
+ """
+ self._inner_counter.markEvent(value)
+
+ def get_count(self) -> int:
+ """
+ Get number of events marked on the meter.
+ """
+ return self._inner_counter.getCount()
diff --git a/flink-python/pyflink/fn_execution/metrics/embedded/metric_impl.py b/flink-python/pyflink/fn_execution/metrics/embedded/metric_impl.py
new file mode 100644
index 00000000000..9081eb029cd
--- /dev/null
+++ b/flink-python/pyflink/fn_execution/metrics/embedded/metric_impl.py
@@ -0,0 +1,61 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Callable
+
+from pemja import findClass
+
+from pyflink.fn_execution.metrics.embedded.counter_impl import CounterImpl
+from pyflink.fn_execution.metrics.embedded.distribution_impl import DistributionImpl
+from pyflink.fn_execution.metrics.embedded.meter_impl import MeterImpl
+from pyflink.metrics import MetricGroup, Counter, Distribution, Meter
+
+JMeterView = findClass('org.apache.flink.metrics.MeterView')
+JMetricGauge = findClass('org.apache.flink.python.metric.embedded.MetricGauge')
+JMetricDistribution = findClass('org.apache.flink.python.metric.embedded.MetricDistribution')
+
+
+class MetricGroupImpl(MetricGroup):
+
+ def __init__(self, metrics):
+ self._metrics = metrics
+
+ def add_group(self, name: str, extra: str = None) -> 'MetricGroup':
+ if extra is None:
+ return MetricGroupImpl(self._metrics.addGroup(name))
+ else:
+ return MetricGroupImpl(self._metrics.addGroup(name, extra))
+
+ def counter(self, name: str) -> 'Counter':
+ return CounterImpl(self._metrics.counter(name))
+
+ def gauge(self, name: str, obj: Callable[[], int]) -> None:
+ self._metrics.gauge(name, JMetricGauge(PythonGaugeCallable(obj)))
+
+ def meter(self, name: str, time_span_in_seconds: int = 60) -> 'Meter':
+ return MeterImpl(self._metrics.meter(name, JMeterView(time_span_in_seconds)))
+
+ def distribution(self, name: str) -> 'Distribution':
+ return DistributionImpl(self._metrics.gauge(name, JMetricDistribution()))
+
+
+class PythonGaugeCallable(object):
+ def __init__(self, func: Callable):
+ self.func = func
+
+ def get_value(self):
+ return self.func()
diff --git a/flink-python/pyflink/metrics/tests/__init__.py b/flink-python/pyflink/fn_execution/metrics/process/__init__.py
similarity index 100%
copy from flink-python/pyflink/metrics/tests/__init__.py
copy to flink-python/pyflink/fn_execution/metrics/process/__init__.py
diff --git a/flink-python/dev/integration_test.sh b/flink-python/pyflink/fn_execution/metrics/process/counter_impl.py
old mode 100755
new mode 100644
similarity index 54%
copy from flink-python/dev/integration_test.sh
copy to flink-python/pyflink/fn_execution/metrics/process/counter_impl.py
index c358632f217..e0364f98cf1
--- a/flink-python/dev/integration_test.sh
+++ b/flink-python/pyflink/fn_execution/metrics/process/counter_impl.py
@@ -1,4 +1,3 @@
-#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -16,34 +15,35 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
+from pyflink.metrics import Counter
-function test_module() {
- module="$FLINK_PYTHON_DIR/pyflink/$1"
- echo "test module $module"
- pytest --durations=20 ${module} $2
- if [[ $? -ne 0 ]]; then
- echo "test module $module failed"
- exit 1
- fi
-}
-# CURRENT_DIR is "flink/flink-python/dev/"
-CURRENT_DIR="$(cd "$( dirname "$0" )" && pwd)"
+class CounterImpl(Counter):
+ def __init__(self, inner_counter):
+ self._inner_counter = inner_counter
-# FLINK_PYTHON_DIR is "flink/flink-python"
-FLINK_PYTHON_DIR=$(dirname "$CURRENT_DIR")
+ def inc(self, n: int = 1):
+ """
+ Increment the current count by the given value.
-# test common module
-test_module "common"
+ .. versionadded:: 1.11.0
+ """
+ self._inner_counter.inc(n)
-# test datastream module
-test_module "datastream"
+ def dec(self, n: int = 1):
+ """
+ Decrement the current count by 1.
-# test fn_execution module
-test_module "fn_execution"
+ .. versionadded:: 1.11.0
+ """
+ self.inc(-n)
-# test metrics module
-test_module "metrics"
+ def get_count(self) -> int:
+ """
+ Returns the current count.
-# test table module
-test_module "table"
+ .. versionadded:: 1.11.0
+ """
+ from apache_beam.metrics.execution import MetricsEnvironment
+ container = MetricsEnvironment.current_container()
+ return container.get_counter(self._inner_counter.metric_name).get_cumulative()
diff --git a/flink-python/pyflink/metrics/tests/__init__.py b/flink-python/pyflink/fn_execution/metrics/process/distribution_impl.py
similarity index 73%
copy from flink-python/pyflink/metrics/tests/__init__.py
copy to flink-python/pyflink/fn_execution/metrics/process/distribution_impl.py
index 65b48d4d79b..5a5273cb983 100644
--- a/flink-python/pyflink/metrics/tests/__init__.py
+++ b/flink-python/pyflink/fn_execution/metrics/process/distribution_impl.py
@@ -15,3 +15,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
+from pyflink.metrics import Distribution
+
+
+class DistributionImpl(Distribution):
+ def __init__(self, inner_distribution):
+ self._inner_distribution = inner_distribution
+
+ def update(self, value):
+ """
+ Updates the distribution value.
+
+ .. versionadded:: 1.11.0
+ """
+ self._inner_distribution.update(value)
diff --git a/flink-python/dev/integration_test.sh b/flink-python/pyflink/fn_execution/metrics/process/meter_impl.py
old mode 100755
new mode 100644
similarity index 58%
copy from flink-python/dev/integration_test.sh
copy to flink-python/pyflink/fn_execution/metrics/process/meter_impl.py
index c358632f217..3ee0856d5bf
--- a/flink-python/dev/integration_test.sh
+++ b/flink-python/pyflink/fn_execution/metrics/process/meter_impl.py
@@ -1,4 +1,3 @@
-#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -16,34 +15,28 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
+from pyflink.metrics import Meter
-function test_module() {
- module="$FLINK_PYTHON_DIR/pyflink/$1"
- echo "test module $module"
- pytest --durations=20 ${module} $2
- if [[ $? -ne 0 ]]; then
- echo "test module $module failed"
- exit 1
- fi
-}
-# CURRENT_DIR is "flink/flink-python/dev/"
-CURRENT_DIR="$(cd "$( dirname "$0" )" && pwd)"
+class MeterImpl(Meter):
-# FLINK_PYTHON_DIR is "flink/flink-python"
-FLINK_PYTHON_DIR=$(dirname "$CURRENT_DIR")
+ def __init__(self, inner_counter):
+ self._inner_counter = inner_counter
-# test common module
-test_module "common"
+ def mark_event(self, value: int = 1):
+ """
+ Mark occurrence of the specified number of events.
-# test datastream module
-test_module "datastream"
+ .. versionadded:: 1.11.0
+ """
+ self._inner_counter.inc(value)
-# test fn_execution module
-test_module "fn_execution"
+ def get_count(self) -> int:
+ """
+ Get number of events marked on the meter.
-# test metrics module
-test_module "metrics"
-
-# test table module
-test_module "table"
+ .. versionadded:: 1.11.0
+ """
+ from apache_beam.metrics.execution import MetricsEnvironment
+ container = MetricsEnvironment.current_container()
+ return container.get_counter(self._inner_counter.metric_name).get_cumulative()
diff --git a/flink-python/pyflink/fn_execution/metrics/process/metric_impl.py b/flink-python/pyflink/fn_execution/metrics/process/metric_impl.py
new file mode 100644
index 00000000000..c48dfbfcc37
--- /dev/null
+++ b/flink-python/pyflink/fn_execution/metrics/process/metric_impl.py
@@ -0,0 +1,103 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import json
+from enum import Enum
+from typing import Callable, List, Tuple
+
+from pyflink.fn_execution.metrics.process.counter_impl import CounterImpl
+from pyflink.fn_execution.metrics.process.distribution_impl import DistributionImpl
+from pyflink.fn_execution.metrics.process.meter_impl import MeterImpl
+from pyflink.metrics import MetricGroup, Counter, Distribution, Meter
+
+
+class MetricGroupType(Enum):
+ """
+ Indicate the type of MetricGroup.
+ """
+ generic = 0
+ key = 1
+ value = 2
+
+
+class GenericMetricGroup(MetricGroup):
+
+ def __init__(
+ self,
+ parent,
+ name,
+ metric_group_type=MetricGroupType.generic):
+ self._parent = parent
+ self._sub_groups = []
+ self._name = name
+ self._metric_group_type = metric_group_type
+ self._flink_gauge = {}
+ self._beam_gauge = {}
+
+ def add_group(self, name: str, extra: str = None) -> 'MetricGroup':
+ if extra is None:
+ return self._add_group(name, MetricGroupType.generic)
+ else:
+ return self._add_group(name, MetricGroupType.key) \
+ ._add_group(extra, MetricGroupType.value)
+
+ def counter(self, name: str) -> 'Counter':
+ from apache_beam.metrics.metric import Metrics
+ return CounterImpl(Metrics.counter(self._get_namespace(), name))
+
+ def gauge(self, name: str, obj: Callable[[], int]) -> None:
+ from apache_beam.metrics.metric import Metrics
+ self._flink_gauge[name] = obj
+ self._beam_gauge[name] = Metrics.gauge(self._get_namespace(), name)
+
+ def meter(self, name: str, time_span_in_seconds: int = 60) -> 'Meter':
+ from apache_beam.metrics.metric import Metrics
+ # There is no meter type in Beam, use counter to implement meter
+ return MeterImpl(Metrics.counter(self._get_namespace(time_span_in_seconds), name))
+
+ def distribution(self, name: str) -> 'Distribution':
+ from apache_beam.metrics.metric import Metrics
+ return DistributionImpl(Metrics.distribution(self._get_namespace(), name))
+
+ def _add_group(self, name: str, metric_group_type: MetricGroupType) -> 'GenericMetricGroup':
+ for group in self._sub_groups:
+ if name == group._name and metric_group_type == group._metric_group_type:
+ # we don't create same metric group repeatedly
+ return group
+
+ sub_group = GenericMetricGroup(
+ self,
+ name,
+ metric_group_type)
+ self._sub_groups.append(sub_group)
+ return sub_group
+
+ def _get_metric_group_names_and_types(self) -> Tuple[List[str], List[str]]:
+ if self._name is None:
+ return [], []
+ else:
+ names, types = self._parent._get_metric_group_names_and_types()
+ names.append(self._name)
+ types.append(str(self._metric_group_type))
+ return names, types
+
+ def _get_namespace(self, time=None) -> str:
+ names, metric_group_type = self._get_metric_group_names_and_types()
+ names.extend(metric_group_type)
+ if time is not None:
+ names.append(str(time))
+ return json.dumps(names)
diff --git a/flink-python/pyflink/metrics/tests/__init__.py b/flink-python/pyflink/fn_execution/metrics/tests/__init__.py
similarity index 100%
rename from flink-python/pyflink/metrics/tests/__init__.py
rename to flink-python/pyflink/fn_execution/metrics/tests/__init__.py
diff --git a/flink-python/pyflink/metrics/tests/test_metric.py b/flink-python/pyflink/fn_execution/metrics/tests/test_metric.py
similarity index 96%
rename from flink-python/pyflink/metrics/tests/test_metric.py
rename to flink-python/pyflink/fn_execution/metrics/tests/test_metric.py
index 543ed4f2154..ea1f4eb635d 100644
--- a/flink-python/pyflink/metrics/tests/test_metric.py
+++ b/flink-python/pyflink/fn_execution/metrics/tests/test_metric.py
@@ -15,17 +15,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-
-from pyflink.metrics.metricbase import GenericMetricGroup, MetricGroup
-from pyflink.table import FunctionContext
-
-from apache_beam.runners.worker import statesampler
-from apache_beam.utils import counters
+from apache_beam.metrics.cells import DistributionData
from apache_beam.metrics.execution import MetricsContainer
from apache_beam.metrics.execution import MetricsEnvironment
from apache_beam.metrics.metricbase import MetricName
-from apache_beam.metrics.cells import DistributionData
+from apache_beam.runners.worker import statesampler
+from apache_beam.utils import counters
+from pyflink.fn_execution.metrics.process.metric_impl import GenericMetricGroup
+from pyflink.metrics.metricbase import MetricGroup
+from pyflink.table import FunctionContext
from pyflink.testing.test_case_utils import PyFlinkTestCase
diff --git a/flink-python/pyflink/fn_execution/table/operations.py b/flink-python/pyflink/fn_execution/table/operations.py
index 3a77f25497d..4534d3a6849 100644
--- a/flink-python/pyflink/fn_execution/table/operations.py
+++ b/flink-python/pyflink/fn_execution/table/operations.py
@@ -34,7 +34,7 @@ from pyflink.fn_execution.table.window_trigger import EventTimeTrigger, Processi
CountTrigger
from pyflink.fn_execution.utils import operation_utils
from pyflink.fn_execution.utils.operation_utils import extract_user_defined_aggregate_function
-from pyflink.metrics.metricbase import GenericMetricGroup
+from pyflink.fn_execution.metrics.process.metric_impl import GenericMetricGroup
try:
from pyflink.fn_execution.table.aggregate_fast import RowKeySelector, \
diff --git a/flink-python/pyflink/metrics/metricbase.py b/flink-python/pyflink/metrics/metricbase.py
index c1d1d400d10..985f0447964 100644
--- a/flink-python/pyflink/metrics/metricbase.py
+++ b/flink-python/pyflink/metrics/metricbase.py
@@ -15,13 +15,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-import abc
-import json
-from enum import Enum
-from typing import Callable, Tuple, List
+from abc import ABC, abstractmethod
+from typing import Callable
-class MetricGroup(abc.ABC):
+class MetricGroup(ABC):
"""
A MetricGroup is a named container for metrics and further metric subgroups.
@@ -33,6 +31,7 @@ class MetricGroup(abc.ABC):
.. versionadded:: 1.11.0
"""
+ @abstractmethod
def add_group(self, name: str, extra: str = None) -> 'MetricGroup':
"""
Creates a new MetricGroup and adds it to this groups sub-groups.
@@ -46,6 +45,7 @@ class MetricGroup(abc.ABC):
"""
pass
+ @abstractmethod
def counter(self, name: str) -> 'Counter':
"""
Registers a new `Counter` with Flink.
@@ -54,6 +54,7 @@ class MetricGroup(abc.ABC):
"""
pass
+ @abstractmethod
def gauge(self, name: str, obj: Callable[[], int]) -> None:
"""
Registers a new `Gauge` with Flink.
@@ -62,15 +63,16 @@ class MetricGroup(abc.ABC):
"""
pass
+ @abstractmethod
def meter(self, name: str, time_span_in_seconds: int = 60) -> 'Meter':
"""
Registers a new `Meter` with Flink.
.. versionadded:: 1.11.0
"""
- # There is no meter type in Beam, use counter to implement meter
pass
+ @abstractmethod
def distribution(self, name: str) -> 'Distribution':
"""
Registers a new `Distribution` with Flink.
@@ -80,86 +82,7 @@ class MetricGroup(abc.ABC):
pass
-class MetricGroupType(Enum):
- """
- Indicate the type of MetricGroup.
- """
- generic = 0
- key = 1
- value = 2
-
-
-class GenericMetricGroup(MetricGroup):
-
- def __init__(
- self,
- parent,
- name,
- metric_group_type=MetricGroupType.generic):
- self._parent = parent
- self._sub_groups = []
- self._name = name
- self._metric_group_type = metric_group_type
- self._flink_gauge = {}
- self._beam_gauge = {}
-
- def _add_group(self, name: str, metric_group_type: MetricGroupType) \
- -> 'GenericMetricGroup':
- for group in self._sub_groups:
- if name == group._name and metric_group_type == group._metric_group_type:
- # we don't create same metric group repeatedly
- return group
-
- sub_group = GenericMetricGroup(
- self,
- name,
- metric_group_type)
- self._sub_groups.append(sub_group)
- return sub_group
-
- def add_group(self, name: str, extra: str = None) -> 'MetricGroup':
- if extra is None:
- return self._add_group(name, MetricGroupType.generic)
- else:
- return self._add_group(name, MetricGroupType.key)\
- ._add_group(extra, MetricGroupType.value)
-
- def counter(self, name: str) -> 'Counter':
- from apache_beam.metrics.metric import Metrics
- return Counter(Metrics.counter(self._get_namespace(), name))
-
- def gauge(self, name: str, obj: Callable[[], int]) -> None:
- from apache_beam.metrics.metric import Metrics
- self._flink_gauge[name] = obj
- self._beam_gauge[name] = Metrics.gauge(self._get_namespace(), name)
-
- def meter(self, name: str, time_span_in_seconds: int = 60) -> 'Meter':
- from apache_beam.metrics.metric import Metrics
- # There is no meter type in Beam, use counter to implement meter
- return Meter(Metrics.counter(self._get_namespace(time_span_in_seconds), name))
-
- def distribution(self, name: str) -> 'Distribution':
- from apache_beam.metrics.metric import Metrics
- return Distribution(Metrics.distribution(self._get_namespace(), name))
-
- def _get_metric_group_names_and_types(self) -> Tuple[List[str], List[str]]:
- if self._name is None:
- return [], []
- else:
- names, types = self._parent._get_metric_group_names_and_types()
- names.append(self._name)
- types.append(str(self._metric_group_type))
- return names, types
-
- def _get_namespace(self, time=None) -> str:
- names, metric_group_type = self._get_metric_group_names_and_types()
- names.extend(metric_group_type)
- if time is not None:
- names.append(str(time))
- return json.dumps(names)
-
-
-class Metric(object):
+class Metric(ABC):
"""
Base interface of a metric object.
@@ -168,7 +91,7 @@ class Metric(object):
pass
-class Counter(Metric):
+class Counter(Metric, ABC):
"""
Counter metric interface. Allows a count to be incremented/decremented
during pipeline execution.
@@ -176,59 +99,35 @@ class Counter(Metric):
.. versionadded:: 1.11.0
"""
- def __init__(self, inner_counter):
- self._inner_counter = inner_counter
-
+ @abstractmethod
def inc(self, n: int = 1):
"""
Increment the current count by the given value.
.. versionadded:: 1.11.0
"""
- self._inner_counter.inc(n)
+ pass
+ @abstractmethod
def dec(self, n: int = 1):
"""
Decrement the current count by 1.
.. versionadded:: 1.11.0
"""
- self.inc(-n)
+ pass
+ @abstractmethod
def get_count(self) -> int:
"""
Returns the current count.
.. versionadded:: 1.11.0
"""
- from apache_beam.metrics.execution import MetricsEnvironment
- container = MetricsEnvironment.current_container()
- return container.get_counter(self._inner_counter.metric_name).get_cumulative()
-
-
-class Distribution(Metric):
- """
- Distribution Metric interface.
-
- Allows statistics about the distribution of a variable to be collected during
- pipeline execution.
-
- .. versionadded:: 1.11.0
- """
-
- def __init__(self, inner_distribution):
- self._inner_distribution = inner_distribution
-
- def update(self, value):
- """
- Updates the distribution value.
-
- .. versionadded:: 1.11.0
- """
- self._inner_distribution.update(value)
+ pass
-class Meter(Metric):
+class Meter(Metric, ABC):
"""
Meter Metric interface.
@@ -237,23 +136,35 @@ class Meter(Metric):
.. versionadded:: 1.11.0
"""
- def __init__(self, inner_counter):
- self._inner_counter = inner_counter
-
+ @abstractmethod
def mark_event(self, value: int = 1):
"""
Mark occurrence of the specified number of events.
.. versionadded:: 1.11.0
"""
- self._inner_counter.inc(value)
+ pass
+ @abstractmethod
def get_count(self) -> int:
"""
Get number of events marked on the meter.
.. versionadded:: 1.11.0
"""
- from apache_beam.metrics.execution import MetricsEnvironment
- container = MetricsEnvironment.current_container()
- return container.get_counter(self._inner_counter.metric_name).get_cumulative()
+ pass
+
+
+class Distribution(Metric, ABC):
+ """
+ Distribution Metric interface.
+
+ Allows statistics about the distribution of a variable to be collected during
+ pipeline execution.
+
+ .. versionadded:: 1.11.0
+ """
+
+ @abstractmethod
+ def update(self, value):
+ pass
diff --git a/flink-python/setup.py b/flink-python/setup.py
index 82e5e9b4488..e524c369256 100644
--- a/flink-python/setup.py
+++ b/flink-python/setup.py
@@ -283,6 +283,9 @@ try:
'pyflink.fn_execution.datastream.window',
'pyflink.fn_execution.embedded',
'pyflink.fn_execution.formats',
+ 'pyflink.fn_execution.metrics',
+ 'pyflink.fn_execution.metrics.embedded',
+ 'pyflink.fn_execution.metrics.process',
'pyflink.fn_execution.table',
'pyflink.fn_execution.utils',
'pyflink.metrics',
diff --git a/flink-python/src/main/java/org/apache/flink/python/metric/embedded/MetricDistribution.java b/flink-python/src/main/java/org/apache/flink/python/metric/embedded/MetricDistribution.java
new file mode 100644
index 00000000000..5cd13889759
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/python/metric/embedded/MetricDistribution.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.metric.embedded;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Gauge;
+
+/** Flink {@link Gauge} for Python Distribution. */
+@Internal
+public class MetricDistribution implements Gauge<Long> {
+
+ private long value;
+
+ public void update(long value) {
+ this.value = value;
+ }
+
+ @Override
+ public Long getValue() {
+ return value;
+ }
+}
diff --git a/flink-python/src/main/java/org/apache/flink/python/metric/embedded/MetricGauge.java b/flink-python/src/main/java/org/apache/flink/python/metric/embedded/MetricGauge.java
new file mode 100644
index 00000000000..afa4d683a4c
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/python/metric/embedded/MetricGauge.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.metric.embedded;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Gauge;
+
+import pemja.core.object.PyObject;
+
+/** Flink {@link Gauge} for Python Gauge. */
+@Internal
+public class MetricGauge implements Gauge<Long> {
+
+ private final PyObject callable;
+
+ public MetricGauge(PyObject callable) {
+ this.callable = callable;
+ }
+
+ @Override
+ public Long getValue() {
+ return (Long) callable.invokeMethod("get_value");
+ }
+}
diff --git a/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java b/flink-python/src/main/java/org/apache/flink/python/metric/process/FlinkMetricContainer.java
similarity index 99%
rename from flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java
rename to flink-python/src/main/java/org/apache/flink/python/metric/process/FlinkMetricContainer.java
index df2f84a283b..4475ca1af6b 100644
--- a/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java
+++ b/flink-python/src/main/java/org/apache/flink/python/metric/process/FlinkMetricContainer.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.python.metric;
+package org.apache.flink.python.metric.process;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.GlobalConfiguration;
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index e4af2f405dd..2c8ee0ea4d6 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.api.operators.python;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.env.PythonEnvironmentManager;
-import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.python.metric.process.FlinkMetricContainer;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
index b090ca98ab6..38644c79cb7 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
-import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.python.metric.process.FlinkMetricContainer;
import org.apache.flink.python.util.ProtoUtils;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.state.KeyedStateBackend;
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
index 20b3d45a3f8..474b633b24d 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
@@ -30,7 +30,7 @@ import org.apache.flink.python.PythonOptions;
import org.apache.flink.python.env.PythonEnvironment;
import org.apache.flink.python.env.process.ProcessPythonEnvironment;
import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
-import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.python.metric.process.FlinkMetricContainer;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.OpaqueMemoryResource;
import org.apache.flink.runtime.state.KeyedStateBackend;
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java
index 888c3c4f9be..ac340616ce2 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
-import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.python.metric.process.FlinkMetricContainer;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner;
diff --git a/flink-python/src/test/java/org/apache/flink/python/metric/FlinkMetricContainerTest.java b/flink-python/src/test/java/org/apache/flink/python/metric/process/FlinkMetricContainerTest.java
similarity index 99%
rename from flink-python/src/test/java/org/apache/flink/python/metric/FlinkMetricContainerTest.java
rename to flink-python/src/test/java/org/apache/flink/python/metric/process/FlinkMetricContainerTest.java
index a48bdd51f56..9645eac91fb 100644
--- a/flink-python/src/test/java/org/apache/flink/python/metric/FlinkMetricContainerTest.java
+++ b/flink-python/src/test/java/org/apache/flink/python/metric/process/FlinkMetricContainerTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.python.metric;
+package org.apache.flink.python.metric.process;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Meter;
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java
index 2e9abd43fe3..a2ef65b7f5f 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java
@@ -26,7 +26,7 @@ import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
-import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.python.metric.process.FlinkMetricContainer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer;
import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
index 661805a570a..599fb9d511b 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.runtime.utils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
-import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.python.metric.process.FlinkMetricContainer;
import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
import org.apache.flink.table.types.logical.RowType;
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
index e7bca6e3032..2125a1fd1b3 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.runtime.utils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
-import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.python.metric.process.FlinkMetricContainer;
import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
import org.apache.flink.table.types.logical.RowType;
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java
index e4a75d52f26..7c72e623bfa 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
-import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.python.metric.process.FlinkMetricContainer;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
import org.apache.flink.table.types.logical.RowType;
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java
index 4ab78479282..39581f170d0 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.runtime.utils;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
-import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.python.metric.process.FlinkMetricContainer;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.table.runtime.operators.python.aggregate.PassThroughPythonStreamGroupWindowAggregateOperator;
import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java
index 28c24170281..f8b79db3a80 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
-import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.python.metric.process.FlinkMetricContainer;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
import org.apache.flink.table.types.logical.RowType;
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PythonTestUtils.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PythonTestUtils.java
index 932d9fa9dfa..9054e71a0c5 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PythonTestUtils.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PythonTestUtils.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.runtime.utils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.python.env.PythonDependencyInfo;
import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
-import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.python.metric.process.FlinkMetricContainer;
import org.apache.flink.python.util.PythonEnvironmentManagerUtils;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.GenericMetricGroup;