You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2023/01/12 09:13:56 UTC
[flink] branch master updated: [FLINK-14023][python] Support accessing job parameters in Python user-defined functions
This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 5cb434c36da [FLINK-14023][python] Support accessing job parameters in Python user-defined functions
5cb434c36da is described below
commit 5cb434c36dab9b3d1c73d0407fe09248c96de63c
Author: Dian Fu <di...@apache.org>
AuthorDate: Wed Jan 11 15:31:33 2023 +0800
[FLINK-14023][python] Support accessing job parameters in Python user-defined functions
This closes #21644.
---
.../docs/dev/python/table/udfs/overview.md | 30 +++
.../content/docs/dev/python/table/udfs/overview.md | 30 +++
.../pyflink/fn_execution/flink_fn_execution_pb2.py | 243 ++++++++++-----------
.../fn_execution/metrics/tests/test_metric.py | 2 +-
.../pyflink/fn_execution/table/operations.py | 6 +-
.../pyflink/proto/flink-fn-execution.proto | 12 +-
flink-python/pyflink/table/tests/test_udf.py | 29 ++-
flink-python/pyflink/table/udf.py | 15 +-
.../org/apache/flink/python/util/ProtoUtils.java | 17 +-
.../AbstractPythonStreamAggregateOperator.java | 10 +
...stractArrowPythonAggregateFunctionOperator.java | 1 +
...wPythonOverWindowAggregateFunctionOperator.java | 11 +
.../AbstractPythonScalarFunctionOperator.java | 1 +
.../EmbeddedPythonScalarFunctionOperator.java | 1 +
.../table/EmbeddedPythonTableFunctionOperator.java | 1 +
.../python/table/PythonTableFunctionOperator.java | 1 +
16 files changed, 269 insertions(+), 141 deletions(-)
diff --git a/docs/content.zh/docs/dev/python/table/udfs/overview.md b/docs/content.zh/docs/dev/python/table/udfs/overview.md
index 741e5960ba7..4e69ba92764 100644
--- a/docs/content.zh/docs/dev/python/table/udfs/overview.md
+++ b/docs/content.zh/docs/dev/python/table/udfs/overview.md
@@ -56,6 +56,36 @@ class Predict(ScalarFunction):
predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")
```
+## 访问作业参数
+
+The `open()` method provides a `FunctionContext` that contains information about the context in which
+user-defined functions are executed, such as the metric group, the global job parameters, etc.
+
+The following information can be obtained by calling the corresponding methods of `FunctionContext`:
+
+| Method | Description |
+| :--------------------------------------- | :---------------------------------------------------------------------- |
+| `get_metric_group()` | Metric group for this parallel subtask. |
+| `get_job_parameter(name, default_value)` | Global job parameter value associated with given key. |
+
+```python
+class HashCode(ScalarFunction):
+
+ def open(self, function_context: FunctionContext):
+ # access the global "hashcode_factor" parameter
+ # "12" would be the default value if the parameter does not exist
+ self.factor = int(function_context.get_job_parameter("hashcode_factor", "12"))
+
+ def eval(self, s: str):
+ return hash(s) * self.factor
+
+hash_code = udf(HashCode(), result_type=DataTypes.INT())
+TableEnvironment t_env = TableEnvironment.create(...)
+t_env.get_config().set('pipeline.global-job-parameters', 'hashcode_factor:31')
+t_env.create_temporary_system_function("hashCode", hash_code)
+t_env.sql_query("SELECT myField, hashCode(myField) FROM MyTable")
+```
+
## 测试自定义函数
假如你定义了如下 Python 自定义函数:
diff --git a/docs/content/docs/dev/python/table/udfs/overview.md b/docs/content/docs/dev/python/table/udfs/overview.md
index 97358acd238..b416a28ee98 100644
--- a/docs/content/docs/dev/python/table/udfs/overview.md
+++ b/docs/content/docs/dev/python/table/udfs/overview.md
@@ -62,6 +62,36 @@ class Predict(ScalarFunction):
predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")
```
+## Accessing job parameters
+
+The `open()` method provides a `FunctionContext` that contains information about the context in which
+user-defined functions are executed, such as the metric group, the global job parameters, etc.
+
+The following information can be obtained by calling the corresponding methods of `FunctionContext`:
+
+| Method | Description |
+| :--------------------------------------- | :---------------------------------------------------------------------- |
+| `get_metric_group()` | Metric group for this parallel subtask. |
+| `get_job_parameter(name, default_value)` | Global job parameter value associated with given key. |
+
+```python
+class HashCode(ScalarFunction):
+
+ def open(self, function_context: FunctionContext):
+ # access the global "hashcode_factor" parameter
+ # "12" would be the default value if the parameter does not exist
+ self.factor = int(function_context.get_job_parameter("hashcode_factor", "12"))
+
+ def eval(self, s: str):
+ return hash(s) * self.factor
+
+hash_code = udf(HashCode(), result_type=DataTypes.INT())
+TableEnvironment t_env = TableEnvironment.create(...)
+t_env.get_config().set('pipeline.global-job-parameters', 'hashcode_factor:31')
+t_env.create_temporary_system_function("hashCode", hash_code)
+t_env.sql_query("SELECT myField, hashCode(myField) FROM MyTable")
+```
+
## Testing User-Defined Functions
Suppose you have defined a Python user-defined function as following:
diff --git a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
index 95ca119137d..a2dc8575338 100644
--- a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
+++ b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
@@ -31,10 +31,11 @@ _sym_db = _symbol_database.Default()
-DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x18\x66link-fn-execution.proto\x12 org.apache.flink.fn_execution.v1\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01 \x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02 \x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03 \x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x12\x0f\n\x07payload\x18\x01 \x01(\x0c\x12\x37\n\x06inputs\x18\x02 \x03(\x0b\x32\'.org.ap [...]
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x18\x66link-fn-execution.proto\x12 org.apache.flink.fn_execution.v1\"*\n\x0cJobParameter\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01 \x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02 \x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03 \x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x1 [...]
+_JOBPARAMETER = DESCRIPTOR.message_types_by_name['JobParameter']
_INPUT = DESCRIPTOR.message_types_by_name['Input']
_USERDEFINEDFUNCTION = DESCRIPTOR.message_types_by_name['UserDefinedFunction']
_USERDEFINEDFUNCTIONS = DESCRIPTOR.message_types_by_name['UserDefinedFunctions']
@@ -65,7 +66,6 @@ _TYPEINFO_ROWTYPEINFO_FIELD = _TYPEINFO_ROWTYPEINFO.nested_types_by_name['Field'
_TYPEINFO_TUPLETYPEINFO = _TYPEINFO.nested_types_by_name['TupleTypeInfo']
_TYPEINFO_AVROTYPEINFO = _TYPEINFO.nested_types_by_name['AvroTypeInfo']
_USERDEFINEDDATASTREAMFUNCTION = DESCRIPTOR.message_types_by_name['UserDefinedDataStreamFunction']
-_USERDEFINEDDATASTREAMFUNCTION_JOBPARAMETER = _USERDEFINEDDATASTREAMFUNCTION.nested_types_by_name['JobParameter']
_USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT = _USERDEFINEDDATASTREAMFUNCTION.nested_types_by_name['RuntimeContext']
_STATEDESCRIPTOR = DESCRIPTOR.message_types_by_name['StateDescriptor']
_STATEDESCRIPTOR_STATETTLCONFIG = _STATEDESCRIPTOR.nested_types_by_name['StateTTLConfig']
@@ -91,6 +91,13 @@ _STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE = _STATEDESCRIPTOR_STATETTLCONFIG.enu
_STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY = _STATEDESCRIPTOR_STATETTLCONFIG.enum_types_by_name['StateVisibility']
_STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC = _STATEDESCRIPTOR_STATETTLCONFIG.enum_types_by_name['TtlTimeCharacteristic']
_CODERINFODESCRIPTOR_MODE = _CODERINFODESCRIPTOR.enum_types_by_name['Mode']
+JobParameter = _reflection.GeneratedProtocolMessageType('JobParameter', (_message.Message,), {
+ 'DESCRIPTOR' : _JOBPARAMETER,
+ '__module__' : 'flink_fn_execution_pb2'
+ # @@protoc_insertion_point(class_scope:org.apache.flink.fn_execution.v1.JobParameter)
+ })
+_sym_db.RegisterMessage(JobParameter)
+
Input = _reflection.GeneratedProtocolMessageType('Input', (_message.Message,), {
'DESCRIPTOR' : _INPUT,
'__module__' : 'flink_fn_execution_pb2'
@@ -316,13 +323,6 @@ _sym_db.RegisterMessage(TypeInfo.AvroTypeInfo)
UserDefinedDataStreamFunction = _reflection.GeneratedProtocolMessageType('UserDefinedDataStreamFunction', (_message.Message,), {
- 'JobParameter' : _reflection.GeneratedProtocolMessageType('JobParameter', (_message.Message,), {
- 'DESCRIPTOR' : _USERDEFINEDDATASTREAMFUNCTION_JOBPARAMETER,
- '__module__' : 'flink_fn_execution_pb2'
- # @@protoc_insertion_point(class_scope:org.apache.flink.fn_execution.v1.UserDefinedDataStreamFunction.JobParameter)
- })
- ,
-
'RuntimeContext' : _reflection.GeneratedProtocolMessageType('RuntimeContext', (_message.Message,), {
'DESCRIPTOR' : _USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT,
'__module__' : 'flink_fn_execution_pb2'
@@ -334,7 +334,6 @@ UserDefinedDataStreamFunction = _reflection.GeneratedProtocolMessageType('UserDe
# @@protoc_insertion_point(class_scope:org.apache.flink.fn_execution.v1.UserDefinedDataStreamFunction)
})
_sym_db.RegisterMessage(UserDefinedDataStreamFunction)
-_sym_db.RegisterMessage(UserDefinedDataStreamFunction.JobParameter)
_sym_db.RegisterMessage(UserDefinedDataStreamFunction.RuntimeContext)
StateDescriptor = _reflection.GeneratedProtocolMessageType('StateDescriptor', (_message.Message,), {
@@ -435,116 +434,116 @@ if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b'\n\037org.apache.flink.fnexecution.v1B\nFlinkFnApi'
- _INPUT._serialized_start=63
- _INPUT._serialized_end=197
- _USERDEFINEDFUNCTION._serialized_start=200
- _USERDEFINEDFUNCTION._serialized_end=368
- _USERDEFINEDFUNCTIONS._serialized_start=371
- _USERDEFINEDFUNCTIONS._serialized_end=574
- _OVERWINDOW._serialized_start=577
- _OVERWINDOW._serialized_end=926
- _OVERWINDOW_WINDOWTYPE._serialized_start=718
- _OVERWINDOW_WINDOWTYPE._serialized_end=926
- _USERDEFINEDAGGREGATEFUNCTION._serialized_start=929
- _USERDEFINEDAGGREGATEFUNCTION._serialized_end=1708
- _USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC._serialized_start=1194
- _USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC._serialized_end=1708
- _USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_LISTVIEW._serialized_start=1457
- _USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_LISTVIEW._serialized_end=1541
- _USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_MAPVIEW._serialized_start=1544
- _USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_MAPVIEW._serialized_end=1695
- _GROUPWINDOW._serialized_start=1711
- _GROUPWINDOW._serialized_end=2267
- _GROUPWINDOW_WINDOWTYPE._serialized_start=2075
- _GROUPWINDOW_WINDOWTYPE._serialized_end=2166
- _GROUPWINDOW_WINDOWPROPERTY._serialized_start=2168
- _GROUPWINDOW_WINDOWPROPERTY._serialized_end=2267
- _USERDEFINEDAGGREGATEFUNCTIONS._serialized_start=2270
- _USERDEFINEDAGGREGATEFUNCTIONS._serialized_end=2804
- _SCHEMA._serialized_start=2807
- _SCHEMA._serialized_end=4845
- _SCHEMA_MAPINFO._serialized_start=2882
- _SCHEMA_MAPINFO._serialized_end=3033
- _SCHEMA_TIMEINFO._serialized_start=3035
- _SCHEMA_TIMEINFO._serialized_end=3064
- _SCHEMA_TIMESTAMPINFO._serialized_start=3066
- _SCHEMA_TIMESTAMPINFO._serialized_end=3100
- _SCHEMA_LOCALZONEDTIMESTAMPINFO._serialized_start=3102
- _SCHEMA_LOCALZONEDTIMESTAMPINFO._serialized_end=3146
- _SCHEMA_ZONEDTIMESTAMPINFO._serialized_start=3148
- _SCHEMA_ZONEDTIMESTAMPINFO._serialized_end=3187
- _SCHEMA_DECIMALINFO._serialized_start=3189
- _SCHEMA_DECIMALINFO._serialized_end=3236
- _SCHEMA_BINARYINFO._serialized_start=3238
- _SCHEMA_BINARYINFO._serialized_end=3266
- _SCHEMA_VARBINARYINFO._serialized_start=3268
- _SCHEMA_VARBINARYINFO._serialized_end=3299
- _SCHEMA_CHARINFO._serialized_start=3301
- _SCHEMA_CHARINFO._serialized_end=3327
- _SCHEMA_VARCHARINFO._serialized_start=3329
- _SCHEMA_VARCHARINFO._serialized_end=3358
- _SCHEMA_FIELDTYPE._serialized_start=3361
- _SCHEMA_FIELDTYPE._serialized_end=4433
- _SCHEMA_FIELD._serialized_start=4435
- _SCHEMA_FIELD._serialized_end=4543
- _SCHEMA_TYPENAME._serialized_start=4546
- _SCHEMA_TYPENAME._serialized_end=4845
- _TYPEINFO._serialized_start=4848
- _TYPEINFO._serialized_end=6195
- _TYPEINFO_MAPTYPEINFO._serialized_start=5342
- _TYPEINFO_MAPTYPEINFO._serialized_end=5481
- _TYPEINFO_ROWTYPEINFO._serialized_start=5484
- _TYPEINFO_ROWTYPEINFO._serialized_end=5668
- _TYPEINFO_ROWTYPEINFO_FIELD._serialized_start=5577
- _TYPEINFO_ROWTYPEINFO_FIELD._serialized_end=5668
- _TYPEINFO_TUPLETYPEINFO._serialized_start=5670
- _TYPEINFO_TUPLETYPEINFO._serialized_end=5750
- _TYPEINFO_AVROTYPEINFO._serialized_start=5752
- _TYPEINFO_AVROTYPEINFO._serialized_end=5782
- _TYPEINFO_TYPENAME._serialized_start=5785
- _TYPEINFO_TYPENAME._serialized_end=6182
- _USERDEFINEDDATASTREAMFUNCTION._serialized_start=6198
- _USERDEFINEDDATASTREAMFUNCTION._serialized_end=7249
- _USERDEFINEDDATASTREAMFUNCTION_JOBPARAMETER._serialized_start=6692
- _USERDEFINEDDATASTREAMFUNCTION_JOBPARAMETER._serialized_end=6734
- _USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT._serialized_start=6737
- _USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT._serialized_end=7073
- _USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE._serialized_start=7076
- _USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE._serialized_end=7249
- _STATEDESCRIPTOR._serialized_start=7252
- _STATEDESCRIPTOR._serialized_end=9144
- _STATEDESCRIPTOR_STATETTLCONFIG._serialized_start=7384
- _STATEDESCRIPTOR_STATETTLCONFIG._serialized_end=9144
- _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES._serialized_start=7855
- _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES._serialized_end=8953
- _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_INCREMENTALCLEANUPSTRATEGY._serialized_start=8033
- _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_INCREMENTALCLEANUPSTRATEGY._serialized_end=8121
- _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_ROCKSDBCOMPACTFILTERCLEANUPSTRATEGY._serialized_start=8123
- _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_ROCKSDBCOMPACTFILTERCLEANUPSTRATEGY._serialized_end=8198
- _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_MAPSTRATEGIESENTRY._serialized_start=8201
- _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_MAPSTRATEGIESENTRY._serialized_end=8809
- _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_STRATEGIES._serialized_start=8811
- _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_STRATEGIES._serialized_end=8909
- _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_EMPTYCLEANUPSTRATEGY._serialized_start=8911
- _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_EMPTYCLEANUPSTRATEGY._serialized_end=8953
- _STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE._serialized_start=8955
- _STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE._serialized_end=9023
- _STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY._serialized_start=9025
- _STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY._serialized_end=9099
- _STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC._serialized_start=9101
- _STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC._serialized_end=9144
- _CODERINFODESCRIPTOR._serialized_start=9147
- _CODERINFODESCRIPTOR._serialized_end=10156
- _CODERINFODESCRIPTOR_FLATTENROWTYPE._serialized_start=9740
- _CODERINFODESCRIPTOR_FLATTENROWTYPE._serialized_end=9814
- _CODERINFODESCRIPTOR_ROWTYPE._serialized_start=9816
- _CODERINFODESCRIPTOR_ROWTYPE._serialized_end=9883
- _CODERINFODESCRIPTOR_ARROWTYPE._serialized_start=9885
- _CODERINFODESCRIPTOR_ARROWTYPE._serialized_end=9954
- _CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE._serialized_start=9956
- _CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE._serialized_end=10035
- _CODERINFODESCRIPTOR_RAWTYPE._serialized_start=10037
- _CODERINFODESCRIPTOR_RAWTYPE._serialized_end=10109
- _CODERINFODESCRIPTOR_MODE._serialized_start=10111
- _CODERINFODESCRIPTOR_MODE._serialized_end=10143
+ _JOBPARAMETER._serialized_start=62
+ _JOBPARAMETER._serialized_end=104
+ _INPUT._serialized_start=107
+ _INPUT._serialized_end=241
+ _USERDEFINEDFUNCTION._serialized_start=244
+ _USERDEFINEDFUNCTION._serialized_end=412
+ _USERDEFINEDFUNCTIONS._serialized_start=415
+ _USERDEFINEDFUNCTIONS._serialized_end=690
+ _OVERWINDOW._serialized_start=693
+ _OVERWINDOW._serialized_end=1042
+ _OVERWINDOW_WINDOWTYPE._serialized_start=834
+ _OVERWINDOW_WINDOWTYPE._serialized_end=1042
+ _USERDEFINEDAGGREGATEFUNCTION._serialized_start=1045
+ _USERDEFINEDAGGREGATEFUNCTION._serialized_end=1824
+ _USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC._serialized_start=1310
+ _USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC._serialized_end=1824
+ _USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_LISTVIEW._serialized_start=1573
+ _USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_LISTVIEW._serialized_end=1657
+ _USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_MAPVIEW._serialized_start=1660
+ _USERDEFINEDAGGREGATEFUNCTION_DATAVIEWSPEC_MAPVIEW._serialized_end=1811
+ _GROUPWINDOW._serialized_start=1827
+ _GROUPWINDOW._serialized_end=2383
+ _GROUPWINDOW_WINDOWTYPE._serialized_start=2191
+ _GROUPWINDOW_WINDOWTYPE._serialized_end=2282
+ _GROUPWINDOW_WINDOWPROPERTY._serialized_start=2284
+ _GROUPWINDOW_WINDOWPROPERTY._serialized_end=2383
+ _USERDEFINEDAGGREGATEFUNCTIONS._serialized_start=2386
+ _USERDEFINEDAGGREGATEFUNCTIONS._serialized_end=2992
+ _SCHEMA._serialized_start=2995
+ _SCHEMA._serialized_end=5033
+ _SCHEMA_MAPINFO._serialized_start=3070
+ _SCHEMA_MAPINFO._serialized_end=3221
+ _SCHEMA_TIMEINFO._serialized_start=3223
+ _SCHEMA_TIMEINFO._serialized_end=3252
+ _SCHEMA_TIMESTAMPINFO._serialized_start=3254
+ _SCHEMA_TIMESTAMPINFO._serialized_end=3288
+ _SCHEMA_LOCALZONEDTIMESTAMPINFO._serialized_start=3290
+ _SCHEMA_LOCALZONEDTIMESTAMPINFO._serialized_end=3334
+ _SCHEMA_ZONEDTIMESTAMPINFO._serialized_start=3336
+ _SCHEMA_ZONEDTIMESTAMPINFO._serialized_end=3375
+ _SCHEMA_DECIMALINFO._serialized_start=3377
+ _SCHEMA_DECIMALINFO._serialized_end=3424
+ _SCHEMA_BINARYINFO._serialized_start=3426
+ _SCHEMA_BINARYINFO._serialized_end=3454
+ _SCHEMA_VARBINARYINFO._serialized_start=3456
+ _SCHEMA_VARBINARYINFO._serialized_end=3487
+ _SCHEMA_CHARINFO._serialized_start=3489
+ _SCHEMA_CHARINFO._serialized_end=3515
+ _SCHEMA_VARCHARINFO._serialized_start=3517
+ _SCHEMA_VARCHARINFO._serialized_end=3546
+ _SCHEMA_FIELDTYPE._serialized_start=3549
+ _SCHEMA_FIELDTYPE._serialized_end=4621
+ _SCHEMA_FIELD._serialized_start=4623
+ _SCHEMA_FIELD._serialized_end=4731
+ _SCHEMA_TYPENAME._serialized_start=4734
+ _SCHEMA_TYPENAME._serialized_end=5033
+ _TYPEINFO._serialized_start=5036
+ _TYPEINFO._serialized_end=6383
+ _TYPEINFO_MAPTYPEINFO._serialized_start=5530
+ _TYPEINFO_MAPTYPEINFO._serialized_end=5669
+ _TYPEINFO_ROWTYPEINFO._serialized_start=5672
+ _TYPEINFO_ROWTYPEINFO._serialized_end=5856
+ _TYPEINFO_ROWTYPEINFO_FIELD._serialized_start=5765
+ _TYPEINFO_ROWTYPEINFO_FIELD._serialized_end=5856
+ _TYPEINFO_TUPLETYPEINFO._serialized_start=5858
+ _TYPEINFO_TUPLETYPEINFO._serialized_end=5938
+ _TYPEINFO_AVROTYPEINFO._serialized_start=5940
+ _TYPEINFO_AVROTYPEINFO._serialized_end=5970
+ _TYPEINFO_TYPENAME._serialized_start=5973
+ _TYPEINFO_TYPENAME._serialized_end=6370
+ _USERDEFINEDDATASTREAMFUNCTION._serialized_start=6386
+ _USERDEFINEDDATASTREAMFUNCTION._serialized_end=7363
+ _USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT._serialized_start=6881
+ _USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT._serialized_end=7187
+ _USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE._serialized_start=7190
+ _USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE._serialized_end=7363
+ _STATEDESCRIPTOR._serialized_start=7366
+ _STATEDESCRIPTOR._serialized_end=9258
+ _STATEDESCRIPTOR_STATETTLCONFIG._serialized_start=7498
+ _STATEDESCRIPTOR_STATETTLCONFIG._serialized_end=9258
+ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES._serialized_start=7969
+ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES._serialized_end=9067
+ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_INCREMENTALCLEANUPSTRATEGY._serialized_start=8147
+ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_INCREMENTALCLEANUPSTRATEGY._serialized_end=8235
+ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_ROCKSDBCOMPACTFILTERCLEANUPSTRATEGY._serialized_start=8237
+ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_ROCKSDBCOMPACTFILTERCLEANUPSTRATEGY._serialized_end=8312
+ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_MAPSTRATEGIESENTRY._serialized_start=8315
+ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_MAPSTRATEGIESENTRY._serialized_end=8923
+ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_STRATEGIES._serialized_start=8925
+ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_STRATEGIES._serialized_end=9023
+ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_EMPTYCLEANUPSTRATEGY._serialized_start=9025
+ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_EMPTYCLEANUPSTRATEGY._serialized_end=9067
+ _STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE._serialized_start=9069
+ _STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE._serialized_end=9137
+ _STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY._serialized_start=9139
+ _STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY._serialized_end=9213
+ _STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC._serialized_start=9215
+ _STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC._serialized_end=9258
+ _CODERINFODESCRIPTOR._serialized_start=9261
+ _CODERINFODESCRIPTOR._serialized_end=10270
+ _CODERINFODESCRIPTOR_FLATTENROWTYPE._serialized_start=9854
+ _CODERINFODESCRIPTOR_FLATTENROWTYPE._serialized_end=9928
+ _CODERINFODESCRIPTOR_ROWTYPE._serialized_start=9930
+ _CODERINFODESCRIPTOR_ROWTYPE._serialized_end=9997
+ _CODERINFODESCRIPTOR_ARROWTYPE._serialized_start=9999
+ _CODERINFODESCRIPTOR_ARROWTYPE._serialized_end=10068
+ _CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE._serialized_start=10070
+ _CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE._serialized_end=10149
+ _CODERINFODESCRIPTOR_RAWTYPE._serialized_start=10151
+ _CODERINFODESCRIPTOR_RAWTYPE._serialized_end=10223
+ _CODERINFODESCRIPTOR_MODE._serialized_start=10225
+ _CODERINFODESCRIPTOR_MODE._serialized_end=10257
# @@protoc_insertion_point(module_scope)
diff --git a/flink-python/pyflink/fn_execution/metrics/tests/test_metric.py b/flink-python/pyflink/fn_execution/metrics/tests/test_metric.py
index ea1f4eb635d..7fba25cb87e 100644
--- a/flink-python/pyflink/fn_execution/metrics/tests/test_metric.py
+++ b/flink-python/pyflink/fn_execution/metrics/tests/test_metric.py
@@ -48,7 +48,7 @@ class MetricTests(PyFlinkTestCase):
self.assertEqual(MetricTests.print_metric_group_path(new_group), 'root.key.value')
def test_metric_not_enabled(self):
- fc = FunctionContext(None)
+ fc = FunctionContext(None, None)
with self.assertRaises(RuntimeError):
fc.get_metric_group()
diff --git a/flink-python/pyflink/fn_execution/table/operations.py b/flink-python/pyflink/fn_execution/table/operations.py
index 4534d3a6849..8a0924dccdc 100644
--- a/flink-python/pyflink/fn_execution/table/operations.py
+++ b/flink-python/pyflink/fn_execution/table/operations.py
@@ -83,6 +83,7 @@ class BaseOperation(Operation):
else:
self.base_metric_group = None
self.func, self.user_defined_funcs = self.generate_func(serialized_fn)
+ self.job_parameters = {p.key: p.value for p in serialized_fn.job_parameters}
def finish(self):
self._update_gauge(self.base_metric_group)
@@ -102,7 +103,7 @@ class BaseOperation(Operation):
def open(self):
for user_defined_func in self.user_defined_funcs:
if hasattr(user_defined_func, 'open'):
- user_defined_func.open(FunctionContext(self.base_metric_group))
+ user_defined_func.open(FunctionContext(self.base_metric_group, self.job_parameters))
def close(self):
for user_defined_func in self.user_defined_funcs:
@@ -323,11 +324,12 @@ class AbstractStreamGroupAggregateOperation(BaseStatefulOperation):
self.state_cache_size = serialized_fn.state_cache_size
self.state_cleaning_enabled = serialized_fn.state_cleaning_enabled
self.data_view_specs = extract_data_view_specs(serialized_fn.udfs)
+ self.job_parameters = {p.key: p.value for p in serialized_fn.job_parameters}
super(AbstractStreamGroupAggregateOperation, self).__init__(
serialized_fn, keyed_state_backend)
def open(self):
- self.group_agg_function.open(FunctionContext(self.base_metric_group))
+ self.group_agg_function.open(FunctionContext(self.base_metric_group, self.job_parameters))
def close(self):
self.group_agg_function.close()
diff --git a/flink-python/pyflink/proto/flink-fn-execution.proto b/flink-python/pyflink/proto/flink-fn-execution.proto
index db27aed7409..468b8524595 100644
--- a/flink-python/pyflink/proto/flink-fn-execution.proto
+++ b/flink-python/pyflink/proto/flink-fn-execution.proto
@@ -25,6 +25,11 @@ package org.apache.flink.fn_execution.v1;
option java_package = "org.apache.flink.fnexecution.v1";
option java_outer_classname = "FlinkFnApi";
+message JobParameter {
+ string key = 1;
+ string value = 2;
+}
+
// ------------------------------------------------------------------------
// Table API & SQL
// ------------------------------------------------------------------------
@@ -65,6 +70,7 @@ message UserDefinedFunctions {
bool metric_enabled = 2;
repeated OverWindow windows = 3;
bool profile_enabled = 4;
+ repeated JobParameter job_parameters = 5;
}
// Used to describe the info of over window in pandas batch over window aggregation
@@ -182,6 +188,7 @@ message UserDefinedAggregateFunctions {
GroupWindow group_window = 12;
bool profile_enabled = 13;
+ repeated JobParameter job_parameters = 14;
}
// A representation of the data schema.
@@ -362,11 +369,6 @@ message UserDefinedDataStreamFunction {
REVISE_OUTPUT = 100;
}
- message JobParameter {
- string key = 1;
- string value = 2;
- }
-
message RuntimeContext {
string task_name = 1;
string task_name_with_subtasks = 2;
diff --git a/flink-python/pyflink/table/tests/test_udf.py b/flink-python/pyflink/table/tests/test_udf.py
index 699786b73d2..6f249e5dde5 100644
--- a/flink-python/pyflink/table/tests/test_udf.py
+++ b/flink-python/pyflink/table/tests/test_udf.py
@@ -26,7 +26,7 @@ import pytz
from pyflink.table import DataTypes, expressions as expr
from pyflink.table.expressions import call
-from pyflink.table.udf import ScalarFunction, udf
+from pyflink.table.udf import ScalarFunction, udf, FunctionContext
from pyflink.testing import source_sink_utils
from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, \
PyFlinkBatchTableTestCase
@@ -41,12 +41,15 @@ class UserDefinedFunctionTests(object):
def test_scalar_function(self):
# test metric disabled.
self.t_env.get_config().set('python.metric.enabled', 'false')
+ self.t_env.get_config().set('pipeline.global-job-parameters', 'subtract_value:2')
# test lambda function
add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
# test Python ScalarFunction
subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())
+ subtract_two = udf(SubtractWithParameters(), result_type=DataTypes.BIGINT())
+
# test callable function
add_one_callable = udf(CallablePlus(), result_type=DataTypes.BIGINT())
@@ -68,7 +71,7 @@ class UserDefinedFunctionTests(object):
sink_table = generate_random_table_name()
sink_table_ddl = f"""
CREATE TABLE {sink_table}(a BIGINT, b BIGINT, c BIGINT, d BIGINT, e BIGINT, f BIGINT,
- g BIGINT) WITH ('connector'='test-sink')
+ g BIGINT, h BIGINT) WITH ('connector'='test-sink')
"""
self.t_env.execute_sql(sink_table_ddl)
@@ -76,11 +79,16 @@ class UserDefinedFunctionTests(object):
t = self.t_env.from_elements([(1, 2, 3), (2, 5, 6), (3, 1, 9)], ['a', 'b', 'c'])
t.where(add_one(t.b) <= 3).select(
- add_one(t.a), subtract_one(t.b), add(t.a, t.c), add_one_callable(t.a),
- add_one_partial(t.a), check_memory_limit(execution_mode), t.a) \
- .execute_insert(sink_table).wait()
+ add_one(t.a),
+ subtract_one(t.b),
+ subtract_two(t.b),
+ add(t.a, t.c),
+ add_one_callable(t.a),
+ add_one_partial(t.a),
+ check_memory_limit(execution_mode),
+ t.a).execute_insert(sink_table).wait()
actual = source_sink_utils.results()
- self.assert_equals(actual, ["+I[2, 1, 4, 2, 2, 1, 1]", "+I[4, 0, 12, 4, 4, 1, 3]"])
+ self.assert_equals(actual, ["+I[2, 1, 0, 4, 2, 2, 1, 1]", "+I[4, 0, -1, 12, 4, 4, 1, 3]"])
def test_chaining_scalar_function(self):
add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
@@ -1010,6 +1018,15 @@ class SubtractOne(ScalarFunction):
return i - 1
+class SubtractWithParameters(ScalarFunction):
+
+ def open(self, function_context: FunctionContext):
+ self.subtract_value = int(function_context.get_job_parameter("subtract_value", "1"))
+
+ def eval(self, i):
+ return i - self.subtract_value
+
+
class SubtractWithMetrics(ScalarFunction, unittest.TestCase):
def open(self, function_context):
diff --git a/flink-python/pyflink/table/udf.py b/flink-python/pyflink/table/udf.py
index 07840423321..cd19fcfcef6 100644
--- a/flink-python/pyflink/table/udf.py
+++ b/flink-python/pyflink/table/udf.py
@@ -37,8 +37,9 @@ class FunctionContext(object):
and global job parameters, etc.
"""
- def __init__(self, base_metric_group):
+ def __init__(self, base_metric_group, job_parameters):
self._base_metric_group = base_metric_group
+ self._job_parameters = job_parameters
def get_metric_group(self) -> MetricGroup:
"""
@@ -51,6 +52,18 @@ class FunctionContext(object):
"metric with the 'python.metric.enabled' configuration.")
return self._base_metric_group
+ def get_job_parameter(self, key: str, default_value: str) -> str:
+ """
+ Gets the global job parameter value associated with the given key as a string.
+
+ :param key: The key pointing to the associated value.
+ :param default_value: The default value which is returned in case global job parameter is
+ null or there is no value associated with the given key.
+
+ .. versionadded:: 1.17.0
+ """
+ return self._job_parameters[key] if key in self._job_parameters else default_value
+
class UserDefinedFunction(abc.ABC):
"""
diff --git a/flink-python/src/main/java/org/apache/flink/python/util/ProtoUtils.java b/flink-python/src/main/java/org/apache/flink/python/util/ProtoUtils.java
index 3de6dd85f2a..70ff78bd610 100644
--- a/flink-python/src/main/java/org/apache/flink/python/util/ProtoUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/python/util/ProtoUtils.java
@@ -134,6 +134,7 @@ public enum ProtoUtils {
// function utilities
public static FlinkFnApi.UserDefinedFunctions createUserDefinedFunctionsProto(
+ RuntimeContext runtimeContext,
PythonFunctionInfo[] userDefinedFunctions,
boolean isMetricEnabled,
boolean isProfileEnabled) {
@@ -144,6 +145,16 @@ public enum ProtoUtils {
}
builder.setMetricEnabled(isMetricEnabled);
builder.setProfileEnabled(isProfileEnabled);
+ builder.addAllJobParameters(
+ runtimeContext.getExecutionConfig().getGlobalJobParameters().toMap().entrySet()
+ .stream()
+ .map(
+ entry ->
+ FlinkFnApi.JobParameter.newBuilder()
+ .setKey(entry.getKey())
+ .setValue(entry.getValue())
+ .build())
+ .collect(Collectors.toList()));
return builder.build();
}
@@ -259,8 +270,7 @@ public enum ProtoUtils {
.entrySet().stream()
.map(
entry ->
- FlinkFnApi.UserDefinedDataStreamFunction
- .JobParameter.newBuilder()
+ FlinkFnApi.JobParameter.newBuilder()
.setKey(entry.getKey())
.setValue(entry.getValue())
.build())
@@ -269,8 +279,7 @@ public enum ProtoUtils {
internalParameters.entrySet().stream()
.map(
entry ->
- FlinkFnApi.UserDefinedDataStreamFunction
- .JobParameter.newBuilder()
+ FlinkFnApi.JobParameter.newBuilder()
.setKey(entry.getKey())
.setValue(entry.getValue())
.build())
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
index 495b7a4b3b2..fa1cd1e1ce5 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
@@ -253,6 +253,16 @@ public abstract class AbstractPythonStreamAggregateOperator
ProtoUtils.createUserDefinedAggregateFunctionProto(
aggregateFunctions[i], specs));
}
+ builder.addAllJobParameters(
+ getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap().entrySet()
+ .stream()
+ .map(
+ entry ->
+ FlinkFnApi.JobParameter.newBuilder()
+ .setKey(entry.getKey())
+ .setValue(entry.getValue())
+ .build())
+ .collect(Collectors.toList()));
return builder.build();
}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java
index 29b5ec965ce..feaa695e188 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java
@@ -150,6 +150,7 @@ public abstract class AbstractArrowPythonAggregateFunctionOperator
@Override
public FlinkFnApi.UserDefinedFunctions createUserDefinedFunctionsProto() {
return ProtoUtils.createUserDefinedFunctionsProto(
+ getRuntimeContext(),
pandasAggFunctions,
config.get(PYTHON_METRIC_ENABLED),
config.get(PYTHON_PROFILE_ENABLED));
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java
index ea82ef80305..fdc975c6d3a 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java
@@ -36,6 +36,7 @@ import org.apache.flink.table.types.logical.RowType;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
+import java.util.stream.Collectors;
import static org.apache.flink.python.PythonOptions.PYTHON_METRIC_ENABLED;
import static org.apache.flink.python.PythonOptions.PYTHON_PROFILE_ENABLED;
@@ -263,6 +264,16 @@ public class BatchArrowPythonOverWindowAggregateFunctionOperator
}
builder.setMetricEnabled(config.get(PYTHON_METRIC_ENABLED));
builder.setProfileEnabled(config.get(PYTHON_PROFILE_ENABLED));
+ builder.addAllJobParameters(
+ getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap().entrySet()
+ .stream()
+ .map(
+ entry ->
+ FlinkFnApi.JobParameter.newBuilder()
+ .setKey(entry.getKey())
+ .setValue(entry.getValue())
+ .build())
+ .collect(Collectors.toList()));
// add windows
for (int i = 0; i < lowerBoundary.length; i++) {
FlinkFnApi.OverWindow.Builder windowBuilder = FlinkFnApi.OverWindow.newBuilder();
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractPythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractPythonScalarFunctionOperator.java
index 69a8ba6b4c3..71c4d473f41 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractPythonScalarFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractPythonScalarFunctionOperator.java
@@ -119,6 +119,7 @@ public abstract class AbstractPythonScalarFunctionOperator
@Override
public FlinkFnApi.UserDefinedFunctions createUserDefinedFunctionsProto() {
return ProtoUtils.createUserDefinedFunctionsProto(
+ getRuntimeContext(),
scalarFunctions,
config.get(PYTHON_METRIC_ENABLED),
config.get(PYTHON_PROFILE_ENABLED));
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java
index 537b658c328..587b7c8635b 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java
@@ -119,6 +119,7 @@ public class EmbeddedPythonScalarFunctionOperator
interpreter.set(
"proto",
ProtoUtils.createUserDefinedFunctionsProto(
+ getRuntimeContext(),
scalarFunctions,
config.get(PYTHON_METRIC_ENABLED),
config.get(PYTHON_PROFILE_ENABLED))
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/EmbeddedPythonTableFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/EmbeddedPythonTableFunctionOperator.java
index 756b1cb4645..77962daa67c 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/EmbeddedPythonTableFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/EmbeddedPythonTableFunctionOperator.java
@@ -111,6 +111,7 @@ public class EmbeddedPythonTableFunctionOperator extends AbstractEmbeddedStatele
interpreter.set(
"proto",
ProtoUtils.createUserDefinedFunctionsProto(
+ getRuntimeContext(),
new PythonFunctionInfo[] {tableFunction},
config.get(PYTHON_METRIC_ENABLED),
config.get(PYTHON_PROFILE_ENABLED))
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
index af47ca4bbfd..34acb94cb26 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
@@ -157,6 +157,7 @@ public class PythonTableFunctionOperator
@Override
public FlinkFnApi.UserDefinedFunctions createUserDefinedFunctionsProto() {
return ProtoUtils.createUserDefinedFunctionsProto(
+ getRuntimeContext(),
new PythonFunctionInfo[] {tableFunction},
config.get(PYTHON_METRIC_ENABLED),
config.get(PYTHON_PROFILE_ENABLED));