You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2023/01/12 09:13:56 UTC

[flink] branch master updated: [FLINK-14023][python] Support accessing job parameters in Python user-defined functions

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cb434c36da [FLINK-14023][python] Support accessing job parameters in Python user-defined functions
5cb434c36da is described below

commit 5cb434c36dab9b3d1c73d0407fe09248c96de63c
Author: Dian Fu <di...@apache.org>
AuthorDate: Wed Jan 11 15:31:33 2023 +0800

    [FLINK-14023][python] Support accessing job parameters in Python user-defined functions
    
    This closes #21644.
---
 .../docs/dev/python/table/udfs/overview.md         |  30 +++
 .../content/docs/dev/python/table/udfs/overview.md |  30 +++
 .../pyflink/fn_execution/flink_fn_execution_pb2.py | 243 ++++++++++-----------
 .../fn_execution/metrics/tests/test_metric.py      |   2 +-
 .../pyflink/fn_execution/table/operations.py       |   6 +-
 .../pyflink/proto/flink-fn-execution.proto         |  12 +-
 flink-python/pyflink/table/tests/test_udf.py       |  29 ++-
 flink-python/pyflink/table/udf.py                  |  15 +-
 .../org/apache/flink/python/util/ProtoUtils.java   |  17 +-
 .../AbstractPythonStreamAggregateOperator.java     |  10 +
 ...stractArrowPythonAggregateFunctionOperator.java |   1 +
 ...wPythonOverWindowAggregateFunctionOperator.java |  11 +
 .../AbstractPythonScalarFunctionOperator.java      |   1 +
 .../EmbeddedPythonScalarFunctionOperator.java      |   1 +
 .../table/EmbeddedPythonTableFunctionOperator.java |   1 +
 .../python/table/PythonTableFunctionOperator.java  |   1 +
 16 files changed, 269 insertions(+), 141 deletions(-)

diff --git a/docs/content.zh/docs/dev/python/table/udfs/overview.md b/docs/content.zh/docs/dev/python/table/udfs/overview.md
index 741e5960ba7..4e69ba92764 100644
--- a/docs/content.zh/docs/dev/python/table/udfs/overview.md
+++ b/docs/content.zh/docs/dev/python/table/udfs/overview.md
@@ -56,6 +56,36 @@ class Predict(ScalarFunction):
 predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")
 ```
 
+## 访问作业参数
+
+The `open()` method provides a `FunctionContext` that contains information about the context in which
+user-defined functions are executed, such as the metric group, the global job parameters, etc.
+
+The following information can be obtained by calling the corresponding methods of `FunctionContext`:
+
+| Method                                   | Description                                                             |
+| :--------------------------------------- | :---------------------------------------------------------------------- |
+| `get_metric_group()`                       | Metric group for this parallel subtask.                                 |
+| `get_job_parameter(name, default_value)`    | Global job parameter value associated with given key.                   |
+
+```python
+class HashCode(ScalarFunction):
+
+    def open(self, function_context: FunctionContext):
+        # access the global "hashcode_factor" parameter
+        # "12" would be the default value if the parameter does not exist
+        self.factor = int(function_context.get_job_parameter("hashcode_factor", "12"))
+
+    def eval(self, s: str):
+        return hash(s) * self.factor
+
+hash_code = udf(HashCode(), result_type=DataTypes.INT())
+TableEnvironment t_env = TableEnvironment.create(...)
+t_env.get_config().set('pipeline.global-job-parameters', 'hashcode_factor:31')
+t_env.create_temporary_system_function("hashCode", hash_code)
+t_env.sql_query("SELECT myField, hashCode(myField) FROM MyTable")
+```
+
 ## 测试自定义函数
 
 假如你定义了如下 Python 自定义函数:
diff --git a/docs/content/docs/dev/python/table/udfs/overview.md b/docs/content/docs/dev/python/table/udfs/overview.md
index 97358acd238..b416a28ee98 100644
--- a/docs/content/docs/dev/python/table/udfs/overview.md
+++ b/docs/content/docs/dev/python/table/udfs/overview.md
@@ -62,6 +62,36 @@ class Predict(ScalarFunction):
 predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")
 ```
 
+## Accessing job parameters
+
+The `open()` method provides a `FunctionContext` that contains information about the context in which
+user-defined functions are executed, such as the metric group, the global job parameters, etc.
+
+The following information can be obtained by calling the corresponding methods of `FunctionContext`:
+
+| Method                                   | Description                                                             |
+| :--------------------------------------- | :---------------------------------------------------------------------- |
+| `get_metric_group()`                       | Metric group for this parallel subtask.                                 |
+| `get_job_parameter(name, default_value)`    | Global job parameter value associated with given key.                   |
+
+```python
+class HashCode(ScalarFunction):
+
+    def open(self, function_context: FunctionContext):
+        # access the global "hashcode_factor" parameter
+        # "12" would be the default value if the parameter does not exist
+        self.factor = int(function_context.get_job_parameter("hashcode_factor", "12"))
+
+    def eval(self, s: str):
+        return hash(s) * self.factor
+
+hash_code = udf(HashCode(), result_type=DataTypes.INT())
+TableEnvironment t_env = TableEnvironment.create(...)
+t_env.get_config().set('pipeline.global-job-parameters', 'hashcode_factor:31')
+t_env.create_temporary_system_function("hashCode", hash_code)
+t_env.sql_query("SELECT myField, hashCode(myField) FROM MyTable")
+```
+
 ## Testing User-Defined Functions
 
 Suppose you have defined a Python user-defined function as following:
diff --git a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
index 95ca119137d..a2dc8575338 100644
--- a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
+++ b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
@@ -31,10 +31,11 @@ _sym_db = _symbol_database.Default()
 
 
 
-DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x18\x66link-fn-execution.proto\x12 org.apache.flink.fn_execution.v1\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01 \x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02 \x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03 \x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x12\x0f\n\x07payload\x18\x01 \x01(\x0c\x12\x37\n\x06inputs\x18\x02 \x03(\x0b\x32\'.org.ap [...]
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x18\x66link-fn-execution.proto\x12 org.apache.flink.fn_execution.v1\"*\n\x0cJobParameter\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01 \x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02 \x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03 \x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x1 [...]
 
 
 
+_JOBPARAMETER = DESCRIPTOR.message_types_by_name['JobParameter']
 _INPUT = DESCRIPTOR.message_types_by_name['Input']
 _USERDEFINEDFUNCTION = DESCRIPTOR.message_types_by_name['UserDefinedFunction']
 _USERDEFINEDFUNCTIONS = DESCRIPTOR.message_types_by_name['UserDefinedFunctions']
@@ -65,7 +66,6 @@ _TYPEINFO_ROWTYPEINFO_FIELD = _TYPEINFO_ROWTYPEINFO.nested_types_by_name['Field'
 _TYPEINFO_TUPLETYPEINFO = _TYPEINFO.nested_types_by_name['TupleTypeInfo']
 _TYPEINFO_AVROTYPEINFO = _TYPEINFO.nested_types_by_name['AvroTypeInfo']
 _USERDEFINEDDATASTREAMFUNCTION = DESCRIPTOR.message_types_by_name['UserDefinedDataStreamFunction']
-_USERDEFINEDDATASTREAMFUNCTION_JOBPARAMETER = _USERDEFINEDDATASTREAMFUNCTION.nested_types_by_name['JobParameter']
 _USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT = _USERDEFINEDDATASTREAMFUNCTION.nested_types_by_name['RuntimeContext']
 _STATEDESCRIPTOR = DESCRIPTOR.message_types_by_name['StateDescriptor']
 _STATEDESCRIPTOR_STATETTLCONFIG = _STATEDESCRIPTOR.nested_types_by_name['StateTTLConfig']
@@ -91,6 +91,13 @@ _STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE = _STATEDESCRIPTOR_STATETTLCONFIG.enu
 _STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY = _STATEDESCRIPTOR_STATETTLCONFIG.enum_types_by_name['StateVisibility']
 _STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC = _STATEDESCRIPTOR_STATETTLCONFIG.enum_types_by_name['TtlTimeCharacteristic']
 _CODERINFODESCRIPTOR_MODE = _CODERINFODESCRIPTOR.enum_types_by_name['Mode']
+JobParameter = _reflection.GeneratedProtocolMessageType('JobParameter', (_message.Message,), {
+  'DESCRIPTOR' : _JOBPARAMETER,
+  '__module__' : 'flink_fn_execution_pb2'
+  # @@protoc_insertion_point(class_scope:org.apache.flink.fn_execution.v1.JobParameter)
+  })
+_sym_db.RegisterMessage(JobParameter)
+
 Input = _reflection.GeneratedProtocolMessageType('Input', (_message.Message,), {
   'DESCRIPTOR' : _INPUT,
   '__module__' : 'flink_fn_execution_pb2'
@@ -316,13 +323,6 @@ _sym_db.RegisterMessage(TypeInfo.AvroTypeInfo)
 
 UserDefinedDataStreamFunction = _reflection.GeneratedProtocolMessageType('UserDefinedDataStreamFunction', (_message.Message,), {
 
-  'JobParameter' : _reflection.GeneratedProtocolMessageType('JobParameter', (_message.Message,), {
-    'DESCRIPTOR' : _USERDEFINEDDATASTREAMFUNCTION_JOBPARAMETER,
-    '__module__' : 'flink_fn_execution_pb2'
-    # @@protoc_insertion_point(class_scope:org.apache.flink.fn_execution.v1.UserDefinedDataStreamFunction.JobParameter)
-    })
-  ,
-
   'RuntimeContext' : _reflection.GeneratedProtocolMessageType('RuntimeContext', (_message.Message,), {
     'DESCRIPTOR' : _USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT,
     '__module__' : 'flink_fn_execution_pb2'
@@ -334,7 +334,6 @@ UserDefinedDataStreamFunction = _reflection.GeneratedProtocolMessageType('UserDe
   # @@protoc_insertion_point(class_scope:org.apache.flink.fn_execution.v1.UserDefinedDataStreamFunction)
   })
 _sym_db.RegisterMessage(UserDefinedDataStreamFunction)
-_sym_db.RegisterMessage(UserDefinedDataStreamFunction.JobParameter)
 _sym_db.RegisterMessage(UserDefinedDataStreamFunction.RuntimeContext)
 
 StateDescriptor = _reflection.GeneratedProtocolMessageType('StateDescriptor', (_message.Message,), {
@@ -435,116 +434,116 @@ if _descriptor._USE_C_DESCRIPTORS == False:
 
   DESCRIPTOR._options = None
   DESCRIPTOR._serialized_options = b'\n\037org.apache.flink.fnexecution.v1B\nFlinkFnApi'
-  _INPUT._serialized_start=63
-  _INPUT._serialized_end=197
-  _USERDEFINEDFUNCTION._serialized_start=200
-  _USERDEFINEDFUNCTION._serialized_end=368
-  _USERDEFINEDFUNCTIONS._serialized_start=371
-  _USERDEFINEDFUNCTIONS._serialized_end=574
-  _OVERWINDOW._serialized_start=577
-  _OVERWINDOW._serialized_end=926
-  _OVERWINDOW_WINDOWTYPE._serialized_start=718
-  _OVERWINDOW_WINDOWTYPE._serialized_end=926
-  _USERDEFINEDAGGREGATEFUNCTION._serialized_start=929
-  _USERDEFINEDAGGREGATEFUNCTION._serialized_end=1708
-  _USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC._serialized_start=1194
-  _USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC._serialized_end=1708
-  _USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_LISTVIEW._serialized_start=1457
-  _USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_LISTVIEW._serialized_end=1541
-  _USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_MAPVIEW._serialized_start=1544
-  _USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_MAPVIEW._serialized_end=1695
-  _GROUPWINDOW._serialized_start=1711
-  _GROUPWINDOW._serialized_end=2267
-  _GROUPWINDOW_WINDOWTYPE._serialized_start=2075
-  _GROUPWINDOW_WINDOWTYPE._serialized_end=2166
-  _GROUPWINDOW_WINDOWPROPERTY._serialized_start=2168
-  _GROUPWINDOW_WINDOWPROPERTY._serialized_end=2267
-  _USERDEFINEDAGGREGATEFUNCTIONS._serialized_start=2270
-  _USERDEFINEDAGGREGATEFUNCTIONS._serialized_end=2804
-  _SCHEMA._serialized_start=2807
-  _SCHEMA._serialized_end=4845
-  _SCHEMA_MAPINFO._serialized_start=2882
-  _SCHEMA_MAPINFO._serialized_end=3033
-  _SCHEMA_TIMEINFO._serialized_start=3035
-  _SCHEMA_TIMEINFO._serialized_end=3064
-  _SCHEMA_TIMESTAMPINFO._serialized_start=3066
-  _SCHEMA_TIMESTAMPINFO._serialized_end=3100
-  _SCHEMA_LOCALZONEDTIMESTAMPINFO._serialized_start=3102
-  _SCHEMA_LOCALZONEDTIMESTAMPINFO._serialized_end=3146
-  _SCHEMA_ZONEDTIMESTAMPINFO._serialized_start=3148
-  _SCHEMA_ZONEDTIMESTAMPINFO._serialized_end=3187
-  _SCHEMA_DECIMALINFO._serialized_start=3189
-  _SCHEMA_DECIMALINFO._serialized_end=3236
-  _SCHEMA_BINARYINFO._serialized_start=3238
-  _SCHEMA_BINARYINFO._serialized_end=3266
-  _SCHEMA_VARBINARYINFO._serialized_start=3268
-  _SCHEMA_VARBINARYINFO._serialized_end=3299
-  _SCHEMA_CHARINFO._serialized_start=3301
-  _SCHEMA_CHARINFO._serialized_end=3327
-  _SCHEMA_VARCHARINFO._serialized_start=3329
-  _SCHEMA_VARCHARINFO._serialized_end=3358
-  _SCHEMA_FIELDTYPE._serialized_start=3361
-  _SCHEMA_FIELDTYPE._serialized_end=4433
-  _SCHEMA_FIELD._serialized_start=4435
-  _SCHEMA_FIELD._serialized_end=4543
-  _SCHEMA_TYPENAME._serialized_start=4546
-  _SCHEMA_TYPENAME._serialized_end=4845
-  _TYPEINFO._serialized_start=4848
-  _TYPEINFO._serialized_end=6195
-  _TYPEINFO_MAPTYPEINFO._serialized_start=5342
-  _TYPEINFO_MAPTYPEINFO._serialized_end=5481
-  _TYPEINFO_ROWTYPEINFO._serialized_start=5484
-  _TYPEINFO_ROWTYPEINFO._serialized_end=5668
-  _TYPEINFO_ROWTYPEINFO_FIELD._serialized_start=5577
-  _TYPEINFO_ROWTYPEINFO_FIELD._serialized_end=5668
-  _TYPEINFO_TUPLETYPEINFO._serialized_start=5670
-  _TYPEINFO_TUPLETYPEINFO._serialized_end=5750
-  _TYPEINFO_AVROTYPEINFO._serialized_start=5752
-  _TYPEINFO_AVROTYPEINFO._serialized_end=5782
-  _TYPEINFO_TYPENAME._serialized_start=5785
-  _TYPEINFO_TYPENAME._serialized_end=6182
-  _USERDEFINEDDATASTREAMFUNCTION._serialized_start=6198
-  _USERDEFINEDDATASTREAMFUNCTION._serialized_end=7249
-  _USERDEFINEDDATASTREAMFUNCTION_JOBPARAMETER._serialized_start=6692
-  _USERDEFINEDDATASTREAMFUNCTION_JOBPARAMETER._serialized_end=6734
-  _USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT._serialized_start=6737
-  _USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT._serialized_end=7073
-  _USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE._serialized_start=7076
-  _USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE._serialized_end=7249
-  _STATEDESCRIPTOR._serialized_start=7252
-  _STATEDESCRIPTOR._serialized_end=9144
-  _STATEDESCRIPTOR_STATETTLCONFIG._serialized_start=7384
-  _STATEDESCRIPTOR_STATETTLCONFIG._serialized_end=9144
-  _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES._serialized_start=7855
-  _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES._serialized_end=8953
-  _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_INCREMENTALCLEANUPSTRATEGY._serialized_start=8033
-  _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_INCREMENTALCLEANUPSTRATEGY._serialized_end=8121
-  _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_ROCKSDBCOMPACTFILTERCLEANUPSTRATEGY._serialized_start=8123
-  _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_ROCKSDBCOMPACTFILTERCLEANUPSTRATEGY._serialized_end=8198
-  _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_MAPSTRATEGIESENTRY._serialized_start=8201
-  _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_MAPSTRATEGIESENTRY._serialized_end=8809
-  _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_STRATEGIES._serialized_start=8811
-  _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_STRATEGIES._serialized_end=8909
-  _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_EMPTYCLEANUPSTRATEGY._serialized_start=8911
-  _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_EMPTYCLEANUPSTRATEGY._serialized_end=8953
-  _STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE._serialized_start=8955
-  _STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE._serialized_end=9023
-  _STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY._serialized_start=9025
-  _STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY._serialized_end=9099
-  _STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC._serialized_start=9101
-  _STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC._serialized_end=9144
-  _CODERINFODESCRIPTOR._serialized_start=9147
-  _CODERINFODESCRIPTOR._serialized_end=10156
-  _CODERINFODESCRIPTOR_FLATTENROWTYPE._serialized_start=9740
-  _CODERINFODESCRIPTOR_FLATTENROWTYPE._serialized_end=9814
-  _CODERINFODESCRIPTOR_ROWTYPE._serialized_start=9816
-  _CODERINFODESCRIPTOR_ROWTYPE._serialized_end=9883
-  _CODERINFODESCRIPTOR_ARROWTYPE._serialized_start=9885
-  _CODERINFODESCRIPTOR_ARROWTYPE._serialized_end=9954
-  _CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE._serialized_start=9956
-  _CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE._serialized_end=10035
-  _CODERINFODESCRIPTOR_RAWTYPE._serialized_start=10037
-  _CODERINFODESCRIPTOR_RAWTYPE._serialized_end=10109
-  _CODERINFODESCRIPTOR_MODE._serialized_start=10111
-  _CODERINFODESCRIPTOR_MODE._serialized_end=10143
+  _JOBPARAMETER._serialized_start=62
+  _JOBPARAMETER._serialized_end=104
+  _INPUT._serialized_start=107
+  _INPUT._serialized_end=241
+  _USERDEFINEDFUNCTION._serialized_start=244
+  _USERDEFINEDFUNCTION._serialized_end=412
+  _USERDEFINEDFUNCTIONS._serialized_start=415
+  _USERDEFINEDFUNCTIONS._serialized_end=690
+  _OVERWINDOW._serialized_start=693
+  _OVERWINDOW._serialized_end=1042
+  _OVERWINDOW_WINDOWTYPE._serialized_start=834
+  _OVERWINDOW_WINDOWTYPE._serialized_end=1042
+  _USERDEFINEDAGGREGATEFUNCTION._serialized_start=1045
+  _USERDEFINEDAGGREGATEFUNCTION._serialized_end=1824
+  _USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC._serialized_start=1310
+  _USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC._serialized_end=1824
+  _USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_LISTVIEW._serialized_start=1573
+  _USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_LISTVIEW._serialized_end=1657
+  _USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_MAPVIEW._serialized_start=1660
+  _USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_MAPVIEW._serialized_end=1811
+  _GROUPWINDOW._serialized_start=1827
+  _GROUPWINDOW._serialized_end=2383
+  _GROUPWINDOW_WINDOWTYPE._serialized_start=2191
+  _GROUPWINDOW_WINDOWTYPE._serialized_end=2282
+  _GROUPWINDOW_WINDOWPROPERTY._serialized_start=2284
+  _GROUPWINDOW_WINDOWPROPERTY._serialized_end=2383
+  _USERDEFINEDAGGREGATEFUNCTIONS._serialized_start=2386
+  _USERDEFINEDAGGREGATEFUNCTIONS._serialized_end=2992
+  _SCHEMA._serialized_start=2995
+  _SCHEMA._serialized_end=5033
+  _SCHEMA_MAPINFO._serialized_start=3070
+  _SCHEMA_MAPINFO._serialized_end=3221
+  _SCHEMA_TIMEINFO._serialized_start=3223
+  _SCHEMA_TIMEINFO._serialized_end=3252
+  _SCHEMA_TIMESTAMPINFO._serialized_start=3254
+  _SCHEMA_TIMESTAMPINFO._serialized_end=3288
+  _SCHEMA_LOCALZONEDTIMESTAMPINFO._serialized_start=3290
+  _SCHEMA_LOCALZONEDTIMESTAMPINFO._serialized_end=3334
+  _SCHEMA_ZONEDTIMESTAMPINFO._serialized_start=3336
+  _SCHEMA_ZONEDTIMESTAMPINFO._serialized_end=3375
+  _SCHEMA_DECIMALINFO._serialized_start=3377
+  _SCHEMA_DECIMALINFO._serialized_end=3424
+  _SCHEMA_BINARYINFO._serialized_start=3426
+  _SCHEMA_BINARYINFO._serialized_end=3454
+  _SCHEMA_VARBINARYINFO._serialized_start=3456
+  _SCHEMA_VARBINARYINFO._serialized_end=3487
+  _SCHEMA_CHARINFO._serialized_start=3489
+  _SCHEMA_CHARINFO._serialized_end=3515
+  _SCHEMA_VARCHARINFO._serialized_start=3517
+  _SCHEMA_VARCHARINFO._serialized_end=3546
+  _SCHEMA_FIELDTYPE._serialized_start=3549
+  _SCHEMA_FIELDTYPE._serialized_end=4621
+  _SCHEMA_FIELD._serialized_start=4623
+  _SCHEMA_FIELD._serialized_end=4731
+  _SCHEMA_TYPENAME._serialized_start=4734
+  _SCHEMA_TYPENAME._serialized_end=5033
+  _TYPEINFO._serialized_start=5036
+  _TYPEINFO._serialized_end=6383
+  _TYPEINFO_MAPTYPEINFO._serialized_start=5530
+  _TYPEINFO_MAPTYPEINFO._serialized_end=5669
+  _TYPEINFO_ROWTYPEINFO._serialized_start=5672
+  _TYPEINFO_ROWTYPEINFO._serialized_end=5856
+  _TYPEINFO_ROWTYPEINFO_FIELD._serialized_start=5765
+  _TYPEINFO_ROWTYPEINFO_FIELD._serialized_end=5856
+  _TYPEINFO_TUPLETYPEINFO._serialized_start=5858
+  _TYPEINFO_TUPLETYPEINFO._serialized_end=5938
+  _TYPEINFO_AVROTYPEINFO._serialized_start=5940
+  _TYPEINFO_AVROTYPEINFO._serialized_end=5970
+  _TYPEINFO_TYPENAME._serialized_start=5973
+  _TYPEINFO_TYPENAME._serialized_end=6370
+  _USERDEFINEDDATASTREAMFUNCTION._serialized_start=6386
+  _USERDEFINEDDATASTREAMFUNCTION._serialized_end=7363
+  _USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT._serialized_start=6881
+  _USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT._serialized_end=7187
+  _USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE._serialized_start=7190
+  _USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE._serialized_end=7363
+  _STATEDESCRIPTOR._serialized_start=7366
+  _STATEDESCRIPTOR._serialized_end=9258
+  _STATEDESCRIPTOR_STATETTLCONFIG._serialized_start=7498
+  _STATEDESCRIPTOR_STATETTLCONFIG._serialized_end=9258
+  _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES._serialized_start=7969
+  _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES._serialized_end=9067
+  _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_INCREMENTALCLEANUPSTRATEGY._serialized_start=8147
+  _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_INCREMENTALCLEANUPSTRATEGY._serialized_end=8235
+  _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_ROCKSDBCOMPACTFILTERCLEANUPSTRATEGY._serialized_start=8237
+  _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_ROCKSDBCOMPACTFILTERCLEANUPSTRATEGY._serialized_end=8312
+  _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_MAPSTRATEGIESENTRY._serialized_start=8315
+  _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_MAPSTRATEGIESENTRY._serialized_end=8923
+  _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_STRATEGIES._serialized_start=8925
+  _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_STRATEGIES._serialized_end=9023
+  _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_EMPTYCLEANUPSTRATEGY._serialized_start=9025
+  _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_EMPTYCLEANUPSTRATEGY._serialized_end=9067
+  _STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE._serialized_start=9069
+  _STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE._serialized_end=9137
+  _STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY._serialized_start=9139
+  _STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY._serialized_end=9213
+  _STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC._serialized_start=9215
+  _STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC._serialized_end=9258
+  _CODERINFODESCRIPTOR._serialized_start=9261
+  _CODERINFODESCRIPTOR._serialized_end=10270
+  _CODERINFODESCRIPTOR_FLATTENROWTYPE._serialized_start=9854
+  _CODERINFODESCRIPTOR_FLATTENROWTYPE._serialized_end=9928
+  _CODERINFODESCRIPTOR_ROWTYPE._serialized_start=9930
+  _CODERINFODESCRIPTOR_ROWTYPE._serialized_end=9997
+  _CODERINFODESCRIPTOR_ARROWTYPE._serialized_start=9999
+  _CODERINFODESCRIPTOR_ARROWTYPE._serialized_end=10068
+  _CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE._serialized_start=10070
+  _CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE._serialized_end=10149
+  _CODERINFODESCRIPTOR_RAWTYPE._serialized_start=10151
+  _CODERINFODESCRIPTOR_RAWTYPE._serialized_end=10223
+  _CODERINFODESCRIPTOR_MODE._serialized_start=10225
+  _CODERINFODESCRIPTOR_MODE._serialized_end=10257
 # @@protoc_insertion_point(module_scope)
diff --git a/flink-python/pyflink/fn_execution/metrics/tests/test_metric.py b/flink-python/pyflink/fn_execution/metrics/tests/test_metric.py
index ea1f4eb635d..7fba25cb87e 100644
--- a/flink-python/pyflink/fn_execution/metrics/tests/test_metric.py
+++ b/flink-python/pyflink/fn_execution/metrics/tests/test_metric.py
@@ -48,7 +48,7 @@ class MetricTests(PyFlinkTestCase):
         self.assertEqual(MetricTests.print_metric_group_path(new_group), 'root.key.value')
 
     def test_metric_not_enabled(self):
-        fc = FunctionContext(None)
+        fc = FunctionContext(None, None)
         with self.assertRaises(RuntimeError):
             fc.get_metric_group()
 
diff --git a/flink-python/pyflink/fn_execution/table/operations.py b/flink-python/pyflink/fn_execution/table/operations.py
index 4534d3a6849..8a0924dccdc 100644
--- a/flink-python/pyflink/fn_execution/table/operations.py
+++ b/flink-python/pyflink/fn_execution/table/operations.py
@@ -83,6 +83,7 @@ class BaseOperation(Operation):
         else:
             self.base_metric_group = None
         self.func, self.user_defined_funcs = self.generate_func(serialized_fn)
+        self.job_parameters = {p.key: p.value for p in serialized_fn.job_parameters}
 
     def finish(self):
         self._update_gauge(self.base_metric_group)
@@ -102,7 +103,7 @@ class BaseOperation(Operation):
     def open(self):
         for user_defined_func in self.user_defined_funcs:
             if hasattr(user_defined_func, 'open'):
-                user_defined_func.open(FunctionContext(self.base_metric_group))
+                user_defined_func.open(FunctionContext(self.base_metric_group, self.job_parameters))
 
     def close(self):
         for user_defined_func in self.user_defined_funcs:
@@ -323,11 +324,12 @@ class AbstractStreamGroupAggregateOperation(BaseStatefulOperation):
         self.state_cache_size = serialized_fn.state_cache_size
         self.state_cleaning_enabled = serialized_fn.state_cleaning_enabled
         self.data_view_specs = extract_data_view_specs(serialized_fn.udfs)
+        self.job_parameters = {p.key: p.value for p in serialized_fn.job_parameters}
         super(AbstractStreamGroupAggregateOperation, self).__init__(
             serialized_fn, keyed_state_backend)
 
     def open(self):
-        self.group_agg_function.open(FunctionContext(self.base_metric_group))
+        self.group_agg_function.open(FunctionContext(self.base_metric_group, self.job_parameters))
 
     def close(self):
         self.group_agg_function.close()
diff --git a/flink-python/pyflink/proto/flink-fn-execution.proto b/flink-python/pyflink/proto/flink-fn-execution.proto
index db27aed7409..468b8524595 100644
--- a/flink-python/pyflink/proto/flink-fn-execution.proto
+++ b/flink-python/pyflink/proto/flink-fn-execution.proto
@@ -25,6 +25,11 @@ package org.apache.flink.fn_execution.v1;
 option java_package = "org.apache.flink.fnexecution.v1";
 option java_outer_classname = "FlinkFnApi";
 
+message JobParameter {
+  string key = 1;
+  string value = 2;
+}
+
 // ------------------------------------------------------------------------
 // Table API & SQL
 // ------------------------------------------------------------------------
@@ -65,6 +70,7 @@ message UserDefinedFunctions {
   bool metric_enabled = 2;
   repeated OverWindow windows = 3;
   bool profile_enabled = 4;
+  repeated JobParameter job_parameters = 5;
 }
 
 // Used to describe the info of over window in pandas batch over window aggregation
@@ -182,6 +188,7 @@ message UserDefinedAggregateFunctions {
   GroupWindow group_window = 12;
 
   bool profile_enabled = 13;
+  repeated JobParameter job_parameters = 14;
 }
 
 // A representation of the data schema.
@@ -362,11 +369,6 @@ message UserDefinedDataStreamFunction {
     REVISE_OUTPUT = 100;
   }
 
-  message JobParameter {
-    string key = 1;
-    string value = 2;
-  }
-
   message RuntimeContext {
     string task_name = 1;
     string task_name_with_subtasks = 2;
diff --git a/flink-python/pyflink/table/tests/test_udf.py b/flink-python/pyflink/table/tests/test_udf.py
index 699786b73d2..6f249e5dde5 100644
--- a/flink-python/pyflink/table/tests/test_udf.py
+++ b/flink-python/pyflink/table/tests/test_udf.py
@@ -26,7 +26,7 @@ import pytz
 
 from pyflink.table import DataTypes, expressions as expr
 from pyflink.table.expressions import call
-from pyflink.table.udf import ScalarFunction, udf
+from pyflink.table.udf import ScalarFunction, udf, FunctionContext
 from pyflink.testing import source_sink_utils
 from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, \
     PyFlinkBatchTableTestCase
@@ -41,12 +41,15 @@ class UserDefinedFunctionTests(object):
     def test_scalar_function(self):
         # test metric disabled.
         self.t_env.get_config().set('python.metric.enabled', 'false')
+        self.t_env.get_config().set('pipeline.global-job-parameters', 'subtract_value:2')
         # test lambda function
         add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
 
         # test Python ScalarFunction
         subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())
 
+        subtract_two = udf(SubtractWithParameters(), result_type=DataTypes.BIGINT())
+
         # test callable function
         add_one_callable = udf(CallablePlus(), result_type=DataTypes.BIGINT())
 
@@ -68,7 +71,7 @@ class UserDefinedFunctionTests(object):
         sink_table = generate_random_table_name()
         sink_table_ddl = f"""
             CREATE TABLE {sink_table}(a BIGINT, b BIGINT, c BIGINT, d BIGINT, e BIGINT, f BIGINT,
-             g BIGINT) WITH ('connector'='test-sink')
+             g BIGINT, h BIGINT) WITH ('connector'='test-sink')
         """
         self.t_env.execute_sql(sink_table_ddl)
 
@@ -76,11 +79,16 @@ class UserDefinedFunctionTests(object):
 
         t = self.t_env.from_elements([(1, 2, 3), (2, 5, 6), (3, 1, 9)], ['a', 'b', 'c'])
         t.where(add_one(t.b) <= 3).select(
-            add_one(t.a), subtract_one(t.b), add(t.a, t.c), add_one_callable(t.a),
-            add_one_partial(t.a), check_memory_limit(execution_mode), t.a) \
-            .execute_insert(sink_table).wait()
+            add_one(t.a),
+            subtract_one(t.b),
+            subtract_two(t.b),
+            add(t.a, t.c),
+            add_one_callable(t.a),
+            add_one_partial(t.a),
+            check_memory_limit(execution_mode),
+            t.a).execute_insert(sink_table).wait()
         actual = source_sink_utils.results()
-        self.assert_equals(actual, ["+I[2, 1, 4, 2, 2, 1, 1]", "+I[4, 0, 12, 4, 4, 1, 3]"])
+        self.assert_equals(actual, ["+I[2, 1, 0, 4, 2, 2, 1, 1]", "+I[4, 0, -1, 12, 4, 4, 1, 3]"])
 
     def test_chaining_scalar_function(self):
         add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
@@ -1010,6 +1018,15 @@ class SubtractOne(ScalarFunction):
         return i - 1
 
 
+class SubtractWithParameters(ScalarFunction):
+
+    def open(self, function_context: FunctionContext):
+        self.subtract_value = int(function_context.get_job_parameter("subtract_value", "1"))
+
+    def eval(self, i):
+        return i - self.subtract_value
+
+
 class SubtractWithMetrics(ScalarFunction, unittest.TestCase):
 
     def open(self, function_context):
diff --git a/flink-python/pyflink/table/udf.py b/flink-python/pyflink/table/udf.py
index 07840423321..cd19fcfcef6 100644
--- a/flink-python/pyflink/table/udf.py
+++ b/flink-python/pyflink/table/udf.py
@@ -37,8 +37,9 @@ class FunctionContext(object):
     and global job parameters, etc.
     """
 
-    def __init__(self, base_metric_group):
+    def __init__(self, base_metric_group, job_parameters):
         self._base_metric_group = base_metric_group
+        self._job_parameters = job_parameters
 
     def get_metric_group(self) -> MetricGroup:
         """
@@ -51,6 +52,18 @@ class FunctionContext(object):
                                "metric with the 'python.metric.enabled' configuration.")
         return self._base_metric_group
 
+    def get_job_parameter(self, key: str, default_value: str) -> str:
+        """
+        Gets the global job parameter value associated with the given key as a string.
+
+        :param key: The key pointing to the associated value.
+        :param default_value: The default value which is returned in case global job parameter is
+                              null or there is no value associated with the given key.
+
+        .. versionadded:: 1.17.0
+        """
+        return self._job_parameters[key] if key in self._job_parameters else default_value
+
 
 class UserDefinedFunction(abc.ABC):
     """
diff --git a/flink-python/src/main/java/org/apache/flink/python/util/ProtoUtils.java b/flink-python/src/main/java/org/apache/flink/python/util/ProtoUtils.java
index 3de6dd85f2a..70ff78bd610 100644
--- a/flink-python/src/main/java/org/apache/flink/python/util/ProtoUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/python/util/ProtoUtils.java
@@ -134,6 +134,7 @@ public enum ProtoUtils {
     // function utilities
 
     public static FlinkFnApi.UserDefinedFunctions createUserDefinedFunctionsProto(
+            RuntimeContext runtimeContext,
             PythonFunctionInfo[] userDefinedFunctions,
             boolean isMetricEnabled,
             boolean isProfileEnabled) {
@@ -144,6 +145,16 @@ public enum ProtoUtils {
         }
         builder.setMetricEnabled(isMetricEnabled);
         builder.setProfileEnabled(isProfileEnabled);
+        builder.addAllJobParameters(
+                runtimeContext.getExecutionConfig().getGlobalJobParameters().toMap().entrySet()
+                        .stream()
+                        .map(
+                                entry ->
+                                        FlinkFnApi.JobParameter.newBuilder()
+                                                .setKey(entry.getKey())
+                                                .setValue(entry.getValue())
+                                                .build())
+                        .collect(Collectors.toList()));
         return builder.build();
     }
 
@@ -259,8 +270,7 @@ public enum ProtoUtils {
                                         .entrySet().stream()
                                         .map(
                                                 entry ->
-                                                        FlinkFnApi.UserDefinedDataStreamFunction
-                                                                .JobParameter.newBuilder()
+                                                        FlinkFnApi.JobParameter.newBuilder()
                                                                 .setKey(entry.getKey())
                                                                 .setValue(entry.getValue())
                                                                 .build())
@@ -269,8 +279,7 @@ public enum ProtoUtils {
                                 internalParameters.entrySet().stream()
                                         .map(
                                                 entry ->
-                                                        FlinkFnApi.UserDefinedDataStreamFunction
-                                                                .JobParameter.newBuilder()
+                                                        FlinkFnApi.JobParameter.newBuilder()
                                                                 .setKey(entry.getKey())
                                                                 .setValue(entry.getValue())
                                                                 .build())
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
index 495b7a4b3b2..fa1cd1e1ce5 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
@@ -253,6 +253,16 @@ public abstract class AbstractPythonStreamAggregateOperator
                     ProtoUtils.createUserDefinedAggregateFunctionProto(
                             aggregateFunctions[i], specs));
         }
+        builder.addAllJobParameters(
+                getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap().entrySet()
+                        .stream()
+                        .map(
+                                entry ->
+                                        FlinkFnApi.JobParameter.newBuilder()
+                                                .setKey(entry.getKey())
+                                                .setValue(entry.getValue())
+                                                .build())
+                        .collect(Collectors.toList()));
         return builder.build();
     }
 
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java
index 29b5ec965ce..feaa695e188 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java
@@ -150,6 +150,7 @@ public abstract class AbstractArrowPythonAggregateFunctionOperator
     @Override
     public FlinkFnApi.UserDefinedFunctions createUserDefinedFunctionsProto() {
         return ProtoUtils.createUserDefinedFunctionsProto(
+                getRuntimeContext(),
                 pandasAggFunctions,
                 config.get(PYTHON_METRIC_ENABLED),
                 config.get(PYTHON_PROFILE_ENABLED));
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java
index ea82ef80305..fdc975c6d3a 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java
@@ -36,6 +36,7 @@ import org.apache.flink.table.types.logical.RowType;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.ListIterator;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.python.PythonOptions.PYTHON_METRIC_ENABLED;
 import static org.apache.flink.python.PythonOptions.PYTHON_PROFILE_ENABLED;
@@ -263,6 +264,16 @@ public class BatchArrowPythonOverWindowAggregateFunctionOperator
         }
         builder.setMetricEnabled(config.get(PYTHON_METRIC_ENABLED));
         builder.setProfileEnabled(config.get(PYTHON_PROFILE_ENABLED));
+        builder.addAllJobParameters(
+                getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap().entrySet()
+                        .stream()
+                        .map(
+                                entry ->
+                                        FlinkFnApi.JobParameter.newBuilder()
+                                                .setKey(entry.getKey())
+                                                .setValue(entry.getValue())
+                                                .build())
+                        .collect(Collectors.toList()));
         // add windows
         for (int i = 0; i < lowerBoundary.length; i++) {
             FlinkFnApi.OverWindow.Builder windowBuilder = FlinkFnApi.OverWindow.newBuilder();
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractPythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractPythonScalarFunctionOperator.java
index 69a8ba6b4c3..71c4d473f41 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractPythonScalarFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractPythonScalarFunctionOperator.java
@@ -119,6 +119,7 @@ public abstract class AbstractPythonScalarFunctionOperator
     @Override
     public FlinkFnApi.UserDefinedFunctions createUserDefinedFunctionsProto() {
         return ProtoUtils.createUserDefinedFunctionsProto(
+                getRuntimeContext(),
                 scalarFunctions,
                 config.get(PYTHON_METRIC_ENABLED),
                 config.get(PYTHON_PROFILE_ENABLED));
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java
index 537b658c328..587b7c8635b 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java
@@ -119,6 +119,7 @@ public class EmbeddedPythonScalarFunctionOperator
         interpreter.set(
                 "proto",
                 ProtoUtils.createUserDefinedFunctionsProto(
+                                getRuntimeContext(),
                                 scalarFunctions,
                                 config.get(PYTHON_METRIC_ENABLED),
                                 config.get(PYTHON_PROFILE_ENABLED))
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/EmbeddedPythonTableFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/EmbeddedPythonTableFunctionOperator.java
index 756b1cb4645..77962daa67c 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/EmbeddedPythonTableFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/EmbeddedPythonTableFunctionOperator.java
@@ -111,6 +111,7 @@ public class EmbeddedPythonTableFunctionOperator extends AbstractEmbeddedStatele
         interpreter.set(
                 "proto",
                 ProtoUtils.createUserDefinedFunctionsProto(
+                                getRuntimeContext(),
                                 new PythonFunctionInfo[] {tableFunction},
                                 config.get(PYTHON_METRIC_ENABLED),
                                 config.get(PYTHON_PROFILE_ENABLED))
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
index af47ca4bbfd..34acb94cb26 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
@@ -157,6 +157,7 @@ public class PythonTableFunctionOperator
     @Override
     public FlinkFnApi.UserDefinedFunctions createUserDefinedFunctionsProto() {
         return ProtoUtils.createUserDefinedFunctionsProto(
+                getRuntimeContext(),
                 new PythonFunctionInfo[] {tableFunction},
                 config.get(PYTHON_METRIC_ENABLED),
                 config.get(PYTHON_PROFILE_ENABLED));