You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/22 01:57:13 UTC
[pulsar] branch master updated: Support key base for python
function (#8540)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 334dffd Support key base for python function (#8540)
334dffd is described below
commit 334dffdb95035349cc0fdbc9f84a15d2422a825b
Author: xiaolong ran <rx...@apache.org>
AuthorDate: Sun Nov 22 09:56:45 2020 +0800
Support key base for python function (#8540)
### Motivation
The changing base on #8523
### Modifications
Support key_base for python function
---
.../instance/src/main/python/Function_pb2.py | 234 ++++++++++++----
.../src/main/python/InstanceCommunication_pb2.py | 306 +++++++++++++--------
.../main/python/InstanceCommunication_pb2_grpc.py | 286 ++++++++++++-------
.../instance/src/main/python/python_instance.py | 8 +
.../integration/functions/PulsarFunctionsTest.java | 3 +
5 files changed, 579 insertions(+), 258 deletions(-)
diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py b/pulsar-functions/instance/src/main/python/Function_pb2.py
index 3ee58e9..14a33a1 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -38,7 +38,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
syntax='proto3',
serialized_options=b'\n!org.apache.pulsar.functions.protoB\010Function',
create_key=_descriptor._internal_create_key,
- serialized_pb=b'\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 \x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\xe7\x05\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\ [...]
+ serialized_pb=b'\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 \x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\xa2\x06\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\ [...]
)
_PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
@@ -66,8 +66,8 @@ _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
],
containing_type=None,
serialized_options=None,
- serialized_start=3207,
- serialized_end=3286,
+ serialized_start=3685,
+ serialized_end=3764,
)
_sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES)
@@ -97,8 +97,8 @@ _SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor(
],
containing_type=None,
serialized_options=None,
- serialized_start=3288,
- serialized_end=3348,
+ serialized_start=3766,
+ serialized_end=3826,
)
_sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE)
@@ -123,8 +123,8 @@ _SUBSCRIPTIONPOSITION = _descriptor.EnumDescriptor(
],
containing_type=None,
serialized_options=None,
- serialized_start=3350,
- serialized_end=3398,
+ serialized_start=3828,
+ serialized_end=3876,
)
_sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONPOSITION)
@@ -149,8 +149,8 @@ _FUNCTIONSTATE = _descriptor.EnumDescriptor(
],
containing_type=None,
serialized_options=None,
- serialized_start=3400,
- serialized_end=3441,
+ serialized_start=3878,
+ serialized_end=3919,
)
_sym_db.RegisterEnumDescriptor(_FUNCTIONSTATE)
@@ -192,8 +192,8 @@ _FUNCTIONDETAILS_RUNTIME = _descriptor.EnumDescriptor(
],
containing_type=None,
serialized_options=None,
- serialized_start=785,
- serialized_end=824,
+ serialized_start=844,
+ serialized_end=883,
)
_sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_RUNTIME)
@@ -227,11 +227,46 @@ _FUNCTIONDETAILS_COMPONENTTYPE = _descriptor.EnumDescriptor(
],
containing_type=None,
serialized_options=None,
- serialized_start=826,
- serialized_end=890,
+ serialized_start=885,
+ serialized_end=949,
)
_sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_COMPONENTTYPE)
+_CRYPTOSPEC_FAILUREACTION = _descriptor.EnumDescriptor(
+ name='FailureAction',
+ full_name='proto.CryptoSpec.FailureAction',
+ filename=None,
+ file=DESCRIPTOR,
+ create_key=_descriptor._internal_create_key,
+ values=[
+ _descriptor.EnumValueDescriptor(
+ name='FAIL', index=0, number=0,
+ serialized_options=None,
+ type=None,
+ create_key=_descriptor._internal_create_key),
+ _descriptor.EnumValueDescriptor(
+ name='DISCARD', index=1, number=1,
+ serialized_options=None,
+ type=None,
+ create_key=_descriptor._internal_create_key),
+ _descriptor.EnumValueDescriptor(
+ name='CONSUME', index=2, number=2,
+ serialized_options=None,
+ type=None,
+ create_key=_descriptor._internal_create_key),
+ _descriptor.EnumValueDescriptor(
+ name='SEND', index=3, number=10,
+ serialized_options=None,
+ type=None,
+ create_key=_descriptor._internal_create_key),
+ ],
+ containing_type=None,
+ serialized_options=None,
+ serialized_start=1873,
+ serialized_end=1934,
+)
+_sym_db.RegisterEnumDescriptor(_CRYPTOSPEC_FAILUREACTION)
+
_RESOURCES = _descriptor.Descriptor(
name='Resources',
@@ -480,6 +515,13 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='subscriptionPosition', full_name='proto.FunctionDetails.subscriptionPosition', index=22,
+ number=23, type=14, cpp_type=8, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
],
extensions=[
],
@@ -495,7 +537,7 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
oneofs=[
],
serialized_start=147,
- serialized_end=890,
+ serialized_end=949,
)
@@ -526,8 +568,8 @@ _CONSUMERSPEC_RECEIVERQUEUESIZE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1185,
- serialized_end=1219,
+ serialized_start=1283,
+ serialized_end=1317,
)
_CONSUMERSPEC_SCHEMAPROPERTIESENTRY = _descriptor.Descriptor(
@@ -564,8 +606,8 @@ _CONSUMERSPEC_SCHEMAPROPERTIESENTRY = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1221,
- serialized_end=1276,
+ serialized_start=1319,
+ serialized_end=1374,
)
_CONSUMERSPEC_CONSUMERPROPERTIESENTRY = _descriptor.Descriptor(
@@ -602,8 +644,8 @@ _CONSUMERSPEC_CONSUMERPROPERTIESENTRY = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1278,
- serialized_end=1335,
+ serialized_start=1376,
+ serialized_end=1433,
)
_CONSUMERSPEC = _descriptor.Descriptor(
@@ -656,6 +698,13 @@ _CONSUMERSPEC = _descriptor.Descriptor(
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='cryptoSpec', full_name='proto.ConsumerSpec.cryptoSpec', index=6,
+ number=7, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
],
extensions=[
],
@@ -668,8 +717,8 @@ _CONSUMERSPEC = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=893,
- serialized_end=1335,
+ serialized_start=952,
+ serialized_end=1433,
)
@@ -702,6 +751,20 @@ _PRODUCERSPEC = _descriptor.Descriptor(
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='cryptoSpec', full_name='proto.ProducerSpec.cryptoSpec', index=3,
+ number=4, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='batchBuilder', full_name='proto.ProducerSpec.batchBuilder', index=4,
+ number=5, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"".decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
],
extensions=[
],
@@ -714,8 +777,69 @@ _PRODUCERSPEC = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1337,
- serialized_end=1456,
+ serialized_start=1436,
+ serialized_end=1616,
+)
+
+
+_CRYPTOSPEC = _descriptor.Descriptor(
+ name='CryptoSpec',
+ full_name='proto.CryptoSpec',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ create_key=_descriptor._internal_create_key,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='cryptoKeyReaderClassName', full_name='proto.CryptoSpec.cryptoKeyReaderClassName', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"".decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='cryptoKeyReaderConfig', full_name='proto.CryptoSpec.cryptoKeyReaderConfig', index=1,
+ number=2, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"".decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='producerEncryptionKeyName', full_name='proto.CryptoSpec.producerEncryptionKeyName', index=2,
+ number=3, type=9, cpp_type=9, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='producerCryptoFailureAction', full_name='proto.CryptoSpec.producerCryptoFailureAction', index=3,
+ number=4, type=14, cpp_type=8, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='consumerCryptoFailureAction', full_name='proto.CryptoSpec.consumerCryptoFailureAction', index=4,
+ number=5, type=14, cpp_type=8, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ _CRYPTOSPEC_FAILUREACTION,
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1619,
+ serialized_end=1934,
)
@@ -753,8 +877,8 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1936,
- serialized_end=1997,
+ serialized_start=2414,
+ serialized_end=2475,
)
_SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
@@ -791,8 +915,8 @@ _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1999,
- serialized_end=2069,
+ serialized_start=2477,
+ serialized_end=2547,
)
_SOURCESPEC = _descriptor.Descriptor(
@@ -906,8 +1030,8 @@ _SOURCESPEC = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1459,
- serialized_end=2069,
+ serialized_start=1937,
+ serialized_end=2547,
)
@@ -945,8 +1069,8 @@ _SINKSPEC_SCHEMAPROPERTIESENTRY = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1221,
- serialized_end=1276,
+ serialized_start=1319,
+ serialized_end=1374,
)
_SINKSPEC_CONSUMERPROPERTIESENTRY = _descriptor.Descriptor(
@@ -983,8 +1107,8 @@ _SINKSPEC_CONSUMERPROPERTIESENTRY = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1278,
- serialized_end=1335,
+ serialized_start=1376,
+ serialized_end=1433,
)
_SINKSPEC = _descriptor.Descriptor(
@@ -1084,8 +1208,8 @@ _SINKSPEC = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=2072,
- serialized_end=2548,
+ serialized_start=2550,
+ serialized_end=3026,
)
@@ -1123,8 +1247,8 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=2550,
- serialized_end=2622,
+ serialized_start=3028,
+ serialized_end=3100,
)
@@ -1162,8 +1286,8 @@ _FUNCTIONMETADATA_INSTANCESTATESENTRY = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=2918,
- serialized_end=2993,
+ serialized_start=3396,
+ serialized_end=3471,
)
_FUNCTIONMETADATA = _descriptor.Descriptor(
@@ -1228,8 +1352,8 @@ _FUNCTIONMETADATA = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=2625,
- serialized_end=2993,
+ serialized_start=3103,
+ serialized_end=3471,
)
@@ -1267,8 +1391,8 @@ _FUNCTIONAUTHENTICATIONSPEC = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=2995,
- serialized_end=3055,
+ serialized_start=3473,
+ serialized_end=3533,
)
@@ -1306,8 +1430,8 @@ _INSTANCE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=3057,
- serialized_end=3138,
+ serialized_start=3535,
+ serialized_end=3616,
)
@@ -1345,8 +1469,8 @@ _ASSIGNMENT = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=3140,
- serialized_end=3205,
+ serialized_start=3618,
+ serialized_end=3683,
)
_FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = _PROCESSINGGUARANTEES
@@ -1356,6 +1480,7 @@ _FUNCTIONDETAILS.fields_by_name['sink'].message_type = _SINKSPEC
_FUNCTIONDETAILS.fields_by_name['resources'].message_type = _RESOURCES
_FUNCTIONDETAILS.fields_by_name['retryDetails'].message_type = _RETRYDETAILS
_FUNCTIONDETAILS.fields_by_name['componentType'].enum_type = _FUNCTIONDETAILS_COMPONENTTYPE
+_FUNCTIONDETAILS.fields_by_name['subscriptionPosition'].enum_type = _SUBSCRIPTIONPOSITION
_FUNCTIONDETAILS_RUNTIME.containing_type = _FUNCTIONDETAILS
_FUNCTIONDETAILS_COMPONENTTYPE.containing_type = _FUNCTIONDETAILS
_CONSUMERSPEC_RECEIVERQUEUESIZE.containing_type = _CONSUMERSPEC
@@ -1364,6 +1489,11 @@ _CONSUMERSPEC_CONSUMERPROPERTIESENTRY.containing_type = _CONSUMERSPEC
_CONSUMERSPEC.fields_by_name['receiverQueueSize'].message_type = _CONSUMERSPEC_RECEIVERQUEUESIZE
_CONSUMERSPEC.fields_by_name['schemaProperties'].message_type = _CONSUMERSPEC_SCHEMAPROPERTIESENTRY
_CONSUMERSPEC.fields_by_name['consumerProperties'].message_type = _CONSUMERSPEC_CONSUMERPROPERTIESENTRY
+_CONSUMERSPEC.fields_by_name['cryptoSpec'].message_type = _CRYPTOSPEC
+_PRODUCERSPEC.fields_by_name['cryptoSpec'].message_type = _CRYPTOSPEC
+_CRYPTOSPEC.fields_by_name['producerCryptoFailureAction'].enum_type = _CRYPTOSPEC_FAILUREACTION
+_CRYPTOSPEC.fields_by_name['consumerCryptoFailureAction'].enum_type = _CRYPTOSPEC_FAILUREACTION
+_CRYPTOSPEC_FAILUREACTION.containing_type = _CRYPTOSPEC
_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY.containing_type = _SOURCESPEC
_SOURCESPEC_INPUTSPECSENTRY.fields_by_name['value'].message_type = _CONSUMERSPEC
_SOURCESPEC_INPUTSPECSENTRY.containing_type = _SOURCESPEC
@@ -1389,6 +1519,7 @@ DESCRIPTOR.message_types_by_name['RetryDetails'] = _RETRYDETAILS
DESCRIPTOR.message_types_by_name['FunctionDetails'] = _FUNCTIONDETAILS
DESCRIPTOR.message_types_by_name['ConsumerSpec'] = _CONSUMERSPEC
DESCRIPTOR.message_types_by_name['ProducerSpec'] = _PRODUCERSPEC
+DESCRIPTOR.message_types_by_name['CryptoSpec'] = _CRYPTOSPEC
DESCRIPTOR.message_types_by_name['SourceSpec'] = _SOURCESPEC
DESCRIPTOR.message_types_by_name['SinkSpec'] = _SINKSPEC
DESCRIPTOR.message_types_by_name['PackageLocationMetaData'] = _PACKAGELOCATIONMETADATA
@@ -1461,6 +1592,13 @@ ProducerSpec = _reflection.GeneratedProtocolMessageType('ProducerSpec', (_messag
})
_sym_db.RegisterMessage(ProducerSpec)
+CryptoSpec = _reflection.GeneratedProtocolMessageType('CryptoSpec', (_message.Message,), {
+ 'DESCRIPTOR' : _CRYPTOSPEC,
+ '__module__' : 'Function_pb2'
+ # @@protoc_insertion_point(class_scope:proto.CryptoSpec)
+ })
+_sym_db.RegisterMessage(CryptoSpec)
+
SourceSpec = _reflection.GeneratedProtocolMessageType('SourceSpec', (_message.Message,), {
'TopicsToSerDeClassNameEntry' : _reflection.GeneratedProtocolMessageType('TopicsToSerDeClassNameEntry', (_message.Message,), {
diff --git a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
index d665e98..d1ba3bb 100644
--- a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
+++ b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
@@ -19,14 +19,11 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: InstanceCommunication.proto
-
-import sys
-_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
+"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
-from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
@@ -39,7 +36,9 @@ DESCRIPTOR = _descriptor.FileDescriptor(
name='InstanceCommunication.proto',
package='proto',
syntax='proto3',
- serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xf6\x03\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 \x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 \x01(\x03\x12\x13\n\x0bnumReceived\x18\x11 \x01(\x03\x12 \n\x18numSuccessfullyProcessed\x18\x05 \x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 \x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 \x03(\x0b\x32*.proto.FunctionStatus.Exc [...]
+ serialized_options=b'\n!org.apache.pulsar.functions.protoB\025InstanceCommunication',
+ create_key=_descriptor._internal_create_key,
+ serialized_pb=b'\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xc4\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 \x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 \x01(\x03\x12\x13\n\x0bnumReceived\x18\x11 \x01(\x03\x12 \n\x18numSuccessfullyProcessed\x18\x05 \x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 \x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 \x03(\x0b\x32*.proto.FunctionStatus.Excep [...]
,
dependencies=[google_dot_protobuf_dot_empty__pb2.DESCRIPTOR,])
@@ -52,35 +51,36 @@ _FUNCTIONSTATUS_EXCEPTIONINFORMATION = _descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='exceptionString', full_name='proto.FunctionStatus.ExceptionInformation.exceptionString', index=0,
number=1, type=9, cpp_type=9, label=1,
- has_default_value=False, default_value=_b("").decode('utf-8'),
+ has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='msSinceEpoch', full_name='proto.FunctionStatus.ExceptionInformation.msSinceEpoch', index=1,
number=2, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[],
enum_types=[
],
- options=None,
+ serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
- serialized_start=501,
- serialized_end=570,
+ serialized_start=707,
+ serialized_end=776,
)
_FUNCTIONSTATUS = _descriptor.Descriptor(
@@ -89,6 +89,7 @@ _FUNCTIONSTATUS = _descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='running', full_name='proto.FunctionStatus.running', index=0,
@@ -96,105 +97,172 @@ _FUNCTIONSTATUS = _descriptor.Descriptor(
has_default_value=False, default_value=False,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='failureException', full_name='proto.FunctionStatus.failureException', index=1,
number=2, type=9, cpp_type=9, label=1,
- has_default_value=False, default_value=_b("").decode('utf-8'),
+ has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='numRestarts', full_name='proto.FunctionStatus.numRestarts', index=2,
number=3, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='numReceived', full_name='proto.FunctionStatus.numReceived', index=3,
number=17, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='numSuccessfullyProcessed', full_name='proto.FunctionStatus.numSuccessfullyProcessed', index=4,
number=5, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='numUserExceptions', full_name='proto.FunctionStatus.numUserExceptions', index=5,
number=6, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='latestUserExceptions', full_name='proto.FunctionStatus.latestUserExceptions', index=6,
number=7, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='numSystemExceptions', full_name='proto.FunctionStatus.numSystemExceptions', index=7,
number=8, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='latestSystemExceptions', full_name='proto.FunctionStatus.latestSystemExceptions', index=8,
number=9, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='numSourceExceptions', full_name='proto.FunctionStatus.numSourceExceptions', index=9,
+ number=18, type=3, cpp_type=2, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='latestSourceExceptions', full_name='proto.FunctionStatus.latestSourceExceptions', index=10,
+ number=19, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
- name='averageLatency', full_name='proto.FunctionStatus.averageLatency', index=9,
+ name='numSinkExceptions', full_name='proto.FunctionStatus.numSinkExceptions', index=11,
+ number=20, type=3, cpp_type=2, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='latestSinkExceptions', full_name='proto.FunctionStatus.latestSinkExceptions', index=12,
+ number=21, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='averageLatency', full_name='proto.FunctionStatus.averageLatency', index=13,
number=12, type=1, cpp_type=5, label=1,
has_default_value=False, default_value=float(0),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
- name='lastInvocationTime', full_name='proto.FunctionStatus.lastInvocationTime', index=10,
+ name='lastInvocationTime', full_name='proto.FunctionStatus.lastInvocationTime', index=14,
number=13, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
- name='instanceId', full_name='proto.FunctionStatus.instanceId', index=11,
+ name='instanceId', full_name='proto.FunctionStatus.instanceId', index=15,
number=14, type=9, cpp_type=9, label=1,
- has_default_value=False, default_value=_b("").decode('utf-8'),
+ has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
- name='workerId', full_name='proto.FunctionStatus.workerId', index=12,
+ name='workerId', full_name='proto.FunctionStatus.workerId', index=16,
number=16, type=9, cpp_type=9, label=1,
- has_default_value=False, default_value=_b("").decode('utf-8'),
+ has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[_FUNCTIONSTATUS_EXCEPTIONINFORMATION, ],
enum_types=[
],
- options=None,
+ serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=68,
- serialized_end=570,
+ serialized_end=776,
+)
+
+
+_FUNCTIONSTATUSLIST = _descriptor.Descriptor(
+ name='FunctionStatusList',
+ full_name='proto.FunctionStatusList',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ create_key=_descriptor._internal_create_key,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='error', full_name='proto.FunctionStatusList.error', index=0,
+ number=2, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"".decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
+ _descriptor.FieldDescriptor(
+ name='functionStatusList', full_name='proto.FunctionStatusList.functionStatusList', index=1,
+ number=1, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=778,
+ serialized_end=864,
)
@@ -204,35 +272,36 @@ _METRICSDATA_USERMETRICSENTRY = _descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='key', full_name='proto.MetricsData.UserMetricsEntry.key', index=0,
number=1, type=9, cpp_type=9, label=1,
- has_default_value=False, default_value=_b("").decode('utf-8'),
+ has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='value', full_name='proto.MetricsData.UserMetricsEntry.value', index=1,
number=2, type=1, cpp_type=5, label=1,
has_default_value=False, default_value=float(0),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[],
enum_types=[
],
- options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+ serialized_options=b'8\001',
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
- serialized_start=987,
- serialized_end=1037,
+ serialized_start=1281,
+ serialized_end=1331,
)
_METRICSDATA = _descriptor.Descriptor(
@@ -241,6 +310,7 @@ _METRICSDATA = _descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='receivedTotal', full_name='proto.MetricsData.receivedTotal', index=0,
@@ -248,98 +318,98 @@ _METRICSDATA = _descriptor.Descriptor(
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='receivedTotal_1min', full_name='proto.MetricsData.receivedTotal_1min', index=1,
number=10, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='processedSuccessfullyTotal', full_name='proto.MetricsData.processedSuccessfullyTotal', index=2,
number=4, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='processedSuccessfullyTotal_1min', full_name='proto.MetricsData.processedSuccessfullyTotal_1min', index=3,
number=12, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='systemExceptionsTotal', full_name='proto.MetricsData.systemExceptionsTotal', index=4,
number=5, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='systemExceptionsTotal_1min', full_name='proto.MetricsData.systemExceptionsTotal_1min', index=5,
number=13, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='userExceptionsTotal', full_name='proto.MetricsData.userExceptionsTotal', index=6,
number=6, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='userExceptionsTotal_1min', full_name='proto.MetricsData.userExceptionsTotal_1min', index=7,
number=14, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='avgProcessLatency', full_name='proto.MetricsData.avgProcessLatency', index=8,
number=7, type=1, cpp_type=5, label=1,
has_default_value=False, default_value=float(0),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='avgProcessLatency_1min', full_name='proto.MetricsData.avgProcessLatency_1min', index=9,
number=15, type=1, cpp_type=5, label=1,
has_default_value=False, default_value=float(0),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='lastInvocation', full_name='proto.MetricsData.lastInvocation', index=10,
number=8, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='userMetrics', full_name='proto.MetricsData.userMetrics', index=11,
number=9, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[_METRICSDATA_USERMETRICSENTRY, ],
enum_types=[
],
- options=None,
+ serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
- serialized_start=573,
- serialized_end=1037,
+ serialized_start=867,
+ serialized_end=1331,
)
@@ -349,6 +419,7 @@ _HEALTHCHECKRESULT = _descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='success', full_name='proto.HealthCheckResult.success', index=0,
@@ -356,21 +427,21 @@ _HEALTHCHECKRESULT = _descriptor.Descriptor(
has_default_value=False, default_value=False,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[],
enum_types=[
],
- options=None,
+ serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
- serialized_start=1039,
- serialized_end=1075,
+ serialized_start=1333,
+ serialized_end=1369,
)
@@ -380,42 +451,43 @@ _METRICS_INSTANCEMETRICS = _descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='name', full_name='proto.Metrics.InstanceMetrics.name', index=0,
number=1, type=9, cpp_type=9, label=1,
- has_default_value=False, default_value=_b("").decode('utf-8'),
+ has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='instanceId', full_name='proto.Metrics.InstanceMetrics.instanceId', index=1,
number=2, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='metricsData', full_name='proto.Metrics.InstanceMetrics.metricsData', index=2,
number=3, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[],
enum_types=[
],
- options=None,
+ serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
- serialized_start=1138,
- serialized_end=1230,
+ serialized_start=1432,
+ serialized_end=1524,
)
_METRICS = _descriptor.Descriptor(
@@ -424,6 +496,7 @@ _METRICS = _descriptor.Descriptor(
filename=None,
file=DESCRIPTOR,
containing_type=None,
+ create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='metrics', full_name='proto.Metrics.metrics', index=0,
@@ -431,103 +504,113 @@ _METRICS = _descriptor.Descriptor(
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[_METRICS_INSTANCEMETRICS, ],
enum_types=[
],
- options=None,
+ serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
- serialized_start=1078,
- serialized_end=1230,
+ serialized_start=1372,
+ serialized_end=1524,
)
_FUNCTIONSTATUS_EXCEPTIONINFORMATION.containing_type = _FUNCTIONSTATUS
_FUNCTIONSTATUS.fields_by_name['latestUserExceptions'].message_type = _FUNCTIONSTATUS_EXCEPTIONINFORMATION
_FUNCTIONSTATUS.fields_by_name['latestSystemExceptions'].message_type = _FUNCTIONSTATUS_EXCEPTIONINFORMATION
+_FUNCTIONSTATUS.fields_by_name['latestSourceExceptions'].message_type = _FUNCTIONSTATUS_EXCEPTIONINFORMATION
+_FUNCTIONSTATUS.fields_by_name['latestSinkExceptions'].message_type = _FUNCTIONSTATUS_EXCEPTIONINFORMATION
+_FUNCTIONSTATUSLIST.fields_by_name['functionStatusList'].message_type = _FUNCTIONSTATUS
_METRICSDATA_USERMETRICSENTRY.containing_type = _METRICSDATA
_METRICSDATA.fields_by_name['userMetrics'].message_type = _METRICSDATA_USERMETRICSENTRY
_METRICS_INSTANCEMETRICS.fields_by_name['metricsData'].message_type = _METRICSDATA
_METRICS_INSTANCEMETRICS.containing_type = _METRICS
_METRICS.fields_by_name['metrics'].message_type = _METRICS_INSTANCEMETRICS
DESCRIPTOR.message_types_by_name['FunctionStatus'] = _FUNCTIONSTATUS
+DESCRIPTOR.message_types_by_name['FunctionStatusList'] = _FUNCTIONSTATUSLIST
DESCRIPTOR.message_types_by_name['MetricsData'] = _METRICSDATA
DESCRIPTOR.message_types_by_name['HealthCheckResult'] = _HEALTHCHECKRESULT
DESCRIPTOR.message_types_by_name['Metrics'] = _METRICS
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
-FunctionStatus = _reflection.GeneratedProtocolMessageType('FunctionStatus', (_message.Message,), dict(
+FunctionStatus = _reflection.GeneratedProtocolMessageType('FunctionStatus', (_message.Message,), {
- ExceptionInformation = _reflection.GeneratedProtocolMessageType('ExceptionInformation', (_message.Message,), dict(
- DESCRIPTOR = _FUNCTIONSTATUS_EXCEPTIONINFORMATION,
- __module__ = 'InstanceCommunication_pb2'
+ 'ExceptionInformation' : _reflection.GeneratedProtocolMessageType('ExceptionInformation', (_message.Message,), {
+ 'DESCRIPTOR' : _FUNCTIONSTATUS_EXCEPTIONINFORMATION,
+ '__module__' : 'InstanceCommunication_pb2'
# @@protoc_insertion_point(class_scope:proto.FunctionStatus.ExceptionInformation)
- ))
+ })
,
- DESCRIPTOR = _FUNCTIONSTATUS,
- __module__ = 'InstanceCommunication_pb2'
+ 'DESCRIPTOR' : _FUNCTIONSTATUS,
+ '__module__' : 'InstanceCommunication_pb2'
# @@protoc_insertion_point(class_scope:proto.FunctionStatus)
- ))
+ })
_sym_db.RegisterMessage(FunctionStatus)
_sym_db.RegisterMessage(FunctionStatus.ExceptionInformation)
-MetricsData = _reflection.GeneratedProtocolMessageType('MetricsData', (_message.Message,), dict(
+FunctionStatusList = _reflection.GeneratedProtocolMessageType('FunctionStatusList', (_message.Message,), {
+ 'DESCRIPTOR' : _FUNCTIONSTATUSLIST,
+ '__module__' : 'InstanceCommunication_pb2'
+ # @@protoc_insertion_point(class_scope:proto.FunctionStatusList)
+ })
+_sym_db.RegisterMessage(FunctionStatusList)
+
+MetricsData = _reflection.GeneratedProtocolMessageType('MetricsData', (_message.Message,), {
- UserMetricsEntry = _reflection.GeneratedProtocolMessageType('UserMetricsEntry', (_message.Message,), dict(
- DESCRIPTOR = _METRICSDATA_USERMETRICSENTRY,
- __module__ = 'InstanceCommunication_pb2'
+ 'UserMetricsEntry' : _reflection.GeneratedProtocolMessageType('UserMetricsEntry', (_message.Message,), {
+ 'DESCRIPTOR' : _METRICSDATA_USERMETRICSENTRY,
+ '__module__' : 'InstanceCommunication_pb2'
# @@protoc_insertion_point(class_scope:proto.MetricsData.UserMetricsEntry)
- ))
+ })
,
- DESCRIPTOR = _METRICSDATA,
- __module__ = 'InstanceCommunication_pb2'
+ 'DESCRIPTOR' : _METRICSDATA,
+ '__module__' : 'InstanceCommunication_pb2'
# @@protoc_insertion_point(class_scope:proto.MetricsData)
- ))
+ })
_sym_db.RegisterMessage(MetricsData)
_sym_db.RegisterMessage(MetricsData.UserMetricsEntry)
-HealthCheckResult = _reflection.GeneratedProtocolMessageType('HealthCheckResult', (_message.Message,), dict(
- DESCRIPTOR = _HEALTHCHECKRESULT,
- __module__ = 'InstanceCommunication_pb2'
+HealthCheckResult = _reflection.GeneratedProtocolMessageType('HealthCheckResult', (_message.Message,), {
+ 'DESCRIPTOR' : _HEALTHCHECKRESULT,
+ '__module__' : 'InstanceCommunication_pb2'
# @@protoc_insertion_point(class_scope:proto.HealthCheckResult)
- ))
+ })
_sym_db.RegisterMessage(HealthCheckResult)
-Metrics = _reflection.GeneratedProtocolMessageType('Metrics', (_message.Message,), dict(
+Metrics = _reflection.GeneratedProtocolMessageType('Metrics', (_message.Message,), {
- InstanceMetrics = _reflection.GeneratedProtocolMessageType('InstanceMetrics', (_message.Message,), dict(
- DESCRIPTOR = _METRICS_INSTANCEMETRICS,
- __module__ = 'InstanceCommunication_pb2'
+ 'InstanceMetrics' : _reflection.GeneratedProtocolMessageType('InstanceMetrics', (_message.Message,), {
+ 'DESCRIPTOR' : _METRICS_INSTANCEMETRICS,
+ '__module__' : 'InstanceCommunication_pb2'
# @@protoc_insertion_point(class_scope:proto.Metrics.InstanceMetrics)
- ))
+ })
,
- DESCRIPTOR = _METRICS,
- __module__ = 'InstanceCommunication_pb2'
+ 'DESCRIPTOR' : _METRICS,
+ '__module__' : 'InstanceCommunication_pb2'
# @@protoc_insertion_point(class_scope:proto.Metrics)
- ))
+ })
_sym_db.RegisterMessage(Metrics)
_sym_db.RegisterMessage(Metrics.InstanceMetrics)
-DESCRIPTOR.has_options = True
-DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n!org.apache.pulsar.functions.protoB\025InstanceCommunication'))
-_METRICSDATA_USERMETRICSENTRY.has_options = True
-_METRICSDATA_USERMETRICSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
+DESCRIPTOR._options = None
+_METRICSDATA_USERMETRICSENTRY._options = None
_INSTANCECONTROL = _descriptor.ServiceDescriptor(
name='InstanceControl',
full_name='proto.InstanceControl',
file=DESCRIPTOR,
index=0,
- options=None,
- serialized_start=1233,
- serialized_end=1581,
+ serialized_options=None,
+ create_key=_descriptor._internal_create_key,
+ serialized_start=1527,
+ serialized_end=1875,
methods=[
_descriptor.MethodDescriptor(
name='GetFunctionStatus',
@@ -536,7 +619,8 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor(
containing_service=None,
input_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
output_type=_FUNCTIONSTATUS,
- options=None,
+ serialized_options=None,
+ create_key=_descriptor._internal_create_key,
),
_descriptor.MethodDescriptor(
name='GetAndResetMetrics',
@@ -545,7 +629,8 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor(
containing_service=None,
input_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
output_type=_METRICSDATA,
- options=None,
+ serialized_options=None,
+ create_key=_descriptor._internal_create_key,
),
_descriptor.MethodDescriptor(
name='ResetMetrics',
@@ -554,7 +639,8 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor(
containing_service=None,
input_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
output_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
- options=None,
+ serialized_options=None,
+ create_key=_descriptor._internal_create_key,
),
_descriptor.MethodDescriptor(
name='GetMetrics',
@@ -563,7 +649,8 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor(
containing_service=None,
input_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
output_type=_METRICSDATA,
- options=None,
+ serialized_options=None,
+ create_key=_descriptor._internal_create_key,
),
_descriptor.MethodDescriptor(
name='HealthCheck',
@@ -572,7 +659,8 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor(
containing_service=None,
input_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
output_type=_HEALTHCHECKRESULT,
- options=None,
+ serialized_options=None,
+ create_key=_descriptor._internal_create_key,
),
])
_sym_db.RegisterServiceDescriptor(_INSTANCECONTROL)
diff --git a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py
index 21730e1..e895f53 100644
--- a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py
+++ b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py
@@ -18,6 +18,7 @@
#
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
+"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import InstanceCommunication_pb2 as InstanceCommunication__pb2
@@ -25,110 +26,193 @@ from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2
class InstanceControlStub(object):
- # missing associated documentation comment in .proto file
- pass
-
- def __init__(self, channel):
- """Constructor.
-
- Args:
- channel: A grpc.Channel.
- """
- self.GetFunctionStatus = channel.unary_unary(
- '/proto.InstanceControl/GetFunctionStatus',
- request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
- response_deserializer=InstanceCommunication__pb2.FunctionStatus.FromString,
- )
- self.GetAndResetMetrics = channel.unary_unary(
- '/proto.InstanceControl/GetAndResetMetrics',
- request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
- response_deserializer=InstanceCommunication__pb2.MetricsData.FromString,
- )
- self.ResetMetrics = channel.unary_unary(
- '/proto.InstanceControl/ResetMetrics',
- request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
- response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
- )
- self.GetMetrics = channel.unary_unary(
- '/proto.InstanceControl/GetMetrics',
- request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
- response_deserializer=InstanceCommunication__pb2.MetricsData.FromString,
- )
- self.HealthCheck = channel.unary_unary(
- '/proto.InstanceControl/HealthCheck',
- request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
- response_deserializer=InstanceCommunication__pb2.HealthCheckResult.FromString,
- )
+ """Missing associated documentation comment in .proto file."""
+
+ def __init__(self, channel):
+ """Constructor.
+
+ Args:
+ channel: A grpc.Channel.
+ """
+ self.GetFunctionStatus = channel.unary_unary(
+ '/proto.InstanceControl/GetFunctionStatus',
+ request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+ response_deserializer=InstanceCommunication__pb2.FunctionStatus.FromString,
+ )
+ self.GetAndResetMetrics = channel.unary_unary(
+ '/proto.InstanceControl/GetAndResetMetrics',
+ request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+ response_deserializer=InstanceCommunication__pb2.MetricsData.FromString,
+ )
+ self.ResetMetrics = channel.unary_unary(
+ '/proto.InstanceControl/ResetMetrics',
+ request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+ response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+ )
+ self.GetMetrics = channel.unary_unary(
+ '/proto.InstanceControl/GetMetrics',
+ request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+ response_deserializer=InstanceCommunication__pb2.MetricsData.FromString,
+ )
+ self.HealthCheck = channel.unary_unary(
+ '/proto.InstanceControl/HealthCheck',
+ request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+ response_deserializer=InstanceCommunication__pb2.HealthCheckResult.FromString,
+ )
class InstanceControlServicer(object):
- # missing associated documentation comment in .proto file
- pass
-
- def GetFunctionStatus(self, request, context):
- # missing associated documentation comment in .proto file
- pass
- context.set_code(grpc.StatusCode.UNIMPLEMENTED)
- context.set_details('Method not implemented!')
- raise NotImplementedError('Method not implemented!')
-
- def GetAndResetMetrics(self, request, context):
- # missing associated documentation comment in .proto file
- pass
- context.set_code(grpc.StatusCode.UNIMPLEMENTED)
- context.set_details('Method not implemented!')
- raise NotImplementedError('Method not implemented!')
-
- def ResetMetrics(self, request, context):
- # missing associated documentation comment in .proto file
- pass
- context.set_code(grpc.StatusCode.UNIMPLEMENTED)
- context.set_details('Method not implemented!')
- raise NotImplementedError('Method not implemented!')
-
- def GetMetrics(self, request, context):
- # missing associated documentation comment in .proto file
- pass
- context.set_code(grpc.StatusCode.UNIMPLEMENTED)
- context.set_details('Method not implemented!')
- raise NotImplementedError('Method not implemented!')
-
- def HealthCheck(self, request, context):
- # missing associated documentation comment in .proto file
- pass
- context.set_code(grpc.StatusCode.UNIMPLEMENTED)
- context.set_details('Method not implemented!')
- raise NotImplementedError('Method not implemented!')
+ """Missing associated documentation comment in .proto file."""
+
+ def GetFunctionStatus(self, request, context):
+ """Missing associated documentation comment in .proto file."""
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
+
+ def GetAndResetMetrics(self, request, context):
+ """Missing associated documentation comment in .proto file."""
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
+
+ def ResetMetrics(self, request, context):
+ """Missing associated documentation comment in .proto file."""
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
+
+ def GetMetrics(self, request, context):
+ """Missing associated documentation comment in .proto file."""
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
+
+ def HealthCheck(self, request, context):
+ """Missing associated documentation comment in .proto file."""
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
def add_InstanceControlServicer_to_server(servicer, server):
- rpc_method_handlers = {
- 'GetFunctionStatus': grpc.unary_unary_rpc_method_handler(
- servicer.GetFunctionStatus,
- request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
- response_serializer=InstanceCommunication__pb2.FunctionStatus.SerializeToString,
- ),
- 'GetAndResetMetrics': grpc.unary_unary_rpc_method_handler(
- servicer.GetAndResetMetrics,
- request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
- response_serializer=InstanceCommunication__pb2.MetricsData.SerializeToString,
- ),
- 'ResetMetrics': grpc.unary_unary_rpc_method_handler(
- servicer.ResetMetrics,
- request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
- response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
- ),
- 'GetMetrics': grpc.unary_unary_rpc_method_handler(
- servicer.GetMetrics,
- request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
- response_serializer=InstanceCommunication__pb2.MetricsData.SerializeToString,
- ),
- 'HealthCheck': grpc.unary_unary_rpc_method_handler(
- servicer.HealthCheck,
- request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
- response_serializer=InstanceCommunication__pb2.HealthCheckResult.SerializeToString,
- ),
- }
- generic_handler = grpc.method_handlers_generic_handler(
- 'proto.InstanceControl', rpc_method_handlers)
- server.add_generic_rpc_handlers((generic_handler,))
+ rpc_method_handlers = {
+ 'GetFunctionStatus': grpc.unary_unary_rpc_method_handler(
+ servicer.GetFunctionStatus,
+ request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+ response_serializer=InstanceCommunication__pb2.FunctionStatus.SerializeToString,
+ ),
+ 'GetAndResetMetrics': grpc.unary_unary_rpc_method_handler(
+ servicer.GetAndResetMetrics,
+ request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+ response_serializer=InstanceCommunication__pb2.MetricsData.SerializeToString,
+ ),
+ 'ResetMetrics': grpc.unary_unary_rpc_method_handler(
+ servicer.ResetMetrics,
+ request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+ response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+ ),
+ 'GetMetrics': grpc.unary_unary_rpc_method_handler(
+ servicer.GetMetrics,
+ request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+ response_serializer=InstanceCommunication__pb2.MetricsData.SerializeToString,
+ ),
+ 'HealthCheck': grpc.unary_unary_rpc_method_handler(
+ servicer.HealthCheck,
+ request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+ response_serializer=InstanceCommunication__pb2.HealthCheckResult.SerializeToString,
+ ),
+ }
+ generic_handler = grpc.method_handlers_generic_handler(
+ 'proto.InstanceControl', rpc_method_handlers)
+ server.add_generic_rpc_handlers((generic_handler,))
+
+
+ # This class is part of an EXPERIMENTAL API.
+class InstanceControl(object):
+ """Missing associated documentation comment in .proto file."""
+
+ @staticmethod
+ def GetFunctionStatus(request,
+ target,
+ options=(),
+ channel_credentials=None,
+ call_credentials=None,
+ insecure=False,
+ compression=None,
+ wait_for_ready=None,
+ timeout=None,
+ metadata=None):
+ return grpc.experimental.unary_unary(request, target, '/proto.InstanceControl/GetFunctionStatus',
+ google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+ InstanceCommunication__pb2.FunctionStatus.FromString,
+ options, channel_credentials,
+ insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
+
+ @staticmethod
+ def GetAndResetMetrics(request,
+ target,
+ options=(),
+ channel_credentials=None,
+ call_credentials=None,
+ insecure=False,
+ compression=None,
+ wait_for_ready=None,
+ timeout=None,
+ metadata=None):
+ return grpc.experimental.unary_unary(request, target, '/proto.InstanceControl/GetAndResetMetrics',
+ google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+ InstanceCommunication__pb2.MetricsData.FromString,
+ options, channel_credentials,
+ insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
+
+ @staticmethod
+ def ResetMetrics(request,
+ target,
+ options=(),
+ channel_credentials=None,
+ call_credentials=None,
+ insecure=False,
+ compression=None,
+ wait_for_ready=None,
+ timeout=None,
+ metadata=None):
+ return grpc.experimental.unary_unary(request, target, '/proto.InstanceControl/ResetMetrics',
+ google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+ google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+ options, channel_credentials,
+ insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
+
+ @staticmethod
+ def GetMetrics(request,
+ target,
+ options=(),
+ channel_credentials=None,
+ call_credentials=None,
+ insecure=False,
+ compression=None,
+ wait_for_ready=None,
+ timeout=None,
+ metadata=None):
+ return grpc.experimental.unary_unary(request, target, '/proto.InstanceControl/GetMetrics',
+ google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+ InstanceCommunication__pb2.MetricsData.FromString,
+ options, channel_credentials,
+ insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
+
+ @staticmethod
+ def HealthCheck(request,
+ target,
+ options=(),
+ channel_credentials=None,
+ call_credentials=None,
+ insecure=False,
+ compression=None,
+ wait_for_ready=None,
+ timeout=None,
+ metadata=None):
+ return grpc.experimental.unary_unary(request, target, '/proto.InstanceControl/HealthCheck',
+ google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+ InstanceCommunication__pb2.HealthCheckResult.FromString,
+ options, channel_credentials,
+ insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index d6ff4fb..fecde7a 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -309,10 +309,18 @@ class PythonInstance(object):
len(self.instance_config.function_details.sink.topic) > 0:
Log.debug("Setting up producer for topic %s" % self.instance_config.function_details.sink.topic)
+ batch_type = pulsar.BatchingType.Default
+ if self.instance_config.function_details.sink.producerSpec.batchBuilder != None and \
+ len(self.instance_config.function_details.sink.producerSpec.batchBuilder) > 0:
+ batch_builder = self.instance_config.function_details.sink.producerSpec.batchBuilder
+ if batch_builder == "KEY_BASED":
+ batch_type = pulsar.BatchingType.KeyBased
+
self.producer = self.pulsar_client.create_producer(
str(self.instance_config.function_details.sink.topic),
block_if_queue_full=True,
batching_enabled=True,
+ batching_type=batch_type,
batching_max_publish_delay_ms=10,
compression_type=pulsar.CompressionType.LZ4,
# set send timeout to be infinity to prevent potential deadlock with consumer
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 723cd46..d95d1d3 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -26,6 +26,7 @@ import net.jodah.failsafe.RetryPolicy;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -981,6 +982,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
@Cleanup Producer<byte[]> producer = client.newProducer(Schema.BYTES)
.topic(inputTopicName)
+ .enableBatching(true)
+ .batcherBuilder(BatcherBuilder.DEFAULT)
.create();
for (int i = 0; i < numMessages; i++) {