You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/22 01:57:13 UTC

[pulsar] branch master updated: Support key base for python function (#8540)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 334dffd  Support key base for python function (#8540)
334dffd is described below

commit 334dffdb95035349cc0fdbc9f84a15d2422a825b
Author: xiaolong ran <rx...@apache.org>
AuthorDate: Sun Nov 22 09:56:45 2020 +0800

    Support key base for python function (#8540)
    
    ### Motivation
    
    The changing base on #8523
    
    ### Modifications
    
    Support key_base  for python function
---
 .../instance/src/main/python/Function_pb2.py       | 234 ++++++++++++----
 .../src/main/python/InstanceCommunication_pb2.py   | 306 +++++++++++++--------
 .../main/python/InstanceCommunication_pb2_grpc.py  | 286 ++++++++++++-------
 .../instance/src/main/python/python_instance.py    |   8 +
 .../integration/functions/PulsarFunctionsTest.java |   3 +
 5 files changed, 579 insertions(+), 258 deletions(-)

diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py b/pulsar-functions/instance/src/main/python/Function_pb2.py
index 3ee58e9..14a33a1 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -38,7 +38,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
   syntax='proto3',
   serialized_options=b'\n!org.apache.pulsar.functions.protoB\010Function',
   create_key=_descriptor._internal_create_key,
-  serialized_pb=b'\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 \x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\xe7\x05\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\ [...]
+  serialized_pb=b'\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 \x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\xa2\x06\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\ [...]
 )
 
 _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
@@ -66,8 +66,8 @@ _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   serialized_options=None,
-  serialized_start=3207,
-  serialized_end=3286,
+  serialized_start=3685,
+  serialized_end=3764,
 )
 _sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES)
 
@@ -97,8 +97,8 @@ _SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   serialized_options=None,
-  serialized_start=3288,
-  serialized_end=3348,
+  serialized_start=3766,
+  serialized_end=3826,
 )
 _sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE)
 
@@ -123,8 +123,8 @@ _SUBSCRIPTIONPOSITION = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   serialized_options=None,
-  serialized_start=3350,
-  serialized_end=3398,
+  serialized_start=3828,
+  serialized_end=3876,
 )
 _sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONPOSITION)
 
@@ -149,8 +149,8 @@ _FUNCTIONSTATE = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   serialized_options=None,
-  serialized_start=3400,
-  serialized_end=3441,
+  serialized_start=3878,
+  serialized_end=3919,
 )
 _sym_db.RegisterEnumDescriptor(_FUNCTIONSTATE)
 
@@ -192,8 +192,8 @@ _FUNCTIONDETAILS_RUNTIME = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   serialized_options=None,
-  serialized_start=785,
-  serialized_end=824,
+  serialized_start=844,
+  serialized_end=883,
 )
 _sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_RUNTIME)
 
@@ -227,11 +227,46 @@ _FUNCTIONDETAILS_COMPONENTTYPE = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   serialized_options=None,
-  serialized_start=826,
-  serialized_end=890,
+  serialized_start=885,
+  serialized_end=949,
 )
 _sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_COMPONENTTYPE)
 
+_CRYPTOSPEC_FAILUREACTION = _descriptor.EnumDescriptor(
+  name='FailureAction',
+  full_name='proto.CryptoSpec.FailureAction',
+  filename=None,
+  file=DESCRIPTOR,
+  create_key=_descriptor._internal_create_key,
+  values=[
+    _descriptor.EnumValueDescriptor(
+      name='FAIL', index=0, number=0,
+      serialized_options=None,
+      type=None,
+      create_key=_descriptor._internal_create_key),
+    _descriptor.EnumValueDescriptor(
+      name='DISCARD', index=1, number=1,
+      serialized_options=None,
+      type=None,
+      create_key=_descriptor._internal_create_key),
+    _descriptor.EnumValueDescriptor(
+      name='CONSUME', index=2, number=2,
+      serialized_options=None,
+      type=None,
+      create_key=_descriptor._internal_create_key),
+    _descriptor.EnumValueDescriptor(
+      name='SEND', index=3, number=10,
+      serialized_options=None,
+      type=None,
+      create_key=_descriptor._internal_create_key),
+  ],
+  containing_type=None,
+  serialized_options=None,
+  serialized_start=1873,
+  serialized_end=1934,
+)
+_sym_db.RegisterEnumDescriptor(_CRYPTOSPEC_FAILUREACTION)
+
 
 _RESOURCES = _descriptor.Descriptor(
   name='Resources',
@@ -480,6 +515,13 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='subscriptionPosition', full_name='proto.FunctionDetails.subscriptionPosition', index=22,
+      number=23, type=14, cpp_type=8, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
   ],
   extensions=[
   ],
@@ -495,7 +537,7 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
   oneofs=[
   ],
   serialized_start=147,
-  serialized_end=890,
+  serialized_end=949,
 )
 
 
@@ -526,8 +568,8 @@ _CONSUMERSPEC_RECEIVERQUEUESIZE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1185,
-  serialized_end=1219,
+  serialized_start=1283,
+  serialized_end=1317,
 )
 
 _CONSUMERSPEC_SCHEMAPROPERTIESENTRY = _descriptor.Descriptor(
@@ -564,8 +606,8 @@ _CONSUMERSPEC_SCHEMAPROPERTIESENTRY = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1221,
-  serialized_end=1276,
+  serialized_start=1319,
+  serialized_end=1374,
 )
 
 _CONSUMERSPEC_CONSUMERPROPERTIESENTRY = _descriptor.Descriptor(
@@ -602,8 +644,8 @@ _CONSUMERSPEC_CONSUMERPROPERTIESENTRY = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1278,
-  serialized_end=1335,
+  serialized_start=1376,
+  serialized_end=1433,
 )
 
 _CONSUMERSPEC = _descriptor.Descriptor(
@@ -656,6 +698,13 @@ _CONSUMERSPEC = _descriptor.Descriptor(
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='cryptoSpec', full_name='proto.ConsumerSpec.cryptoSpec', index=6,
+      number=7, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
   ],
   extensions=[
   ],
@@ -668,8 +717,8 @@ _CONSUMERSPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=893,
-  serialized_end=1335,
+  serialized_start=952,
+  serialized_end=1433,
 )
 
 
@@ -702,6 +751,20 @@ _PRODUCERSPEC = _descriptor.Descriptor(
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='cryptoSpec', full_name='proto.ProducerSpec.cryptoSpec', index=3,
+      number=4, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='batchBuilder', full_name='proto.ProducerSpec.batchBuilder', index=4,
+      number=5, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=b"".decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
   ],
   extensions=[
   ],
@@ -714,8 +777,69 @@ _PRODUCERSPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1337,
-  serialized_end=1456,
+  serialized_start=1436,
+  serialized_end=1616,
+)
+
+
+_CRYPTOSPEC = _descriptor.Descriptor(
+  name='CryptoSpec',
+  full_name='proto.CryptoSpec',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  create_key=_descriptor._internal_create_key,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='cryptoKeyReaderClassName', full_name='proto.CryptoSpec.cryptoKeyReaderClassName', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=b"".decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='cryptoKeyReaderConfig', full_name='proto.CryptoSpec.cryptoKeyReaderConfig', index=1,
+      number=2, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=b"".decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='producerEncryptionKeyName', full_name='proto.CryptoSpec.producerEncryptionKeyName', index=2,
+      number=3, type=9, cpp_type=9, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='producerCryptoFailureAction', full_name='proto.CryptoSpec.producerCryptoFailureAction', index=3,
+      number=4, type=14, cpp_type=8, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='consumerCryptoFailureAction', full_name='proto.CryptoSpec.consumerCryptoFailureAction', index=4,
+      number=5, type=14, cpp_type=8, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+    _CRYPTOSPEC_FAILUREACTION,
+  ],
+  serialized_options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=1619,
+  serialized_end=1934,
 )
 
 
@@ -753,8 +877,8 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1936,
-  serialized_end=1997,
+  serialized_start=2414,
+  serialized_end=2475,
 )
 
 _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
@@ -791,8 +915,8 @@ _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1999,
-  serialized_end=2069,
+  serialized_start=2477,
+  serialized_end=2547,
 )
 
 _SOURCESPEC = _descriptor.Descriptor(
@@ -906,8 +1030,8 @@ _SOURCESPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1459,
-  serialized_end=2069,
+  serialized_start=1937,
+  serialized_end=2547,
 )
 
 
@@ -945,8 +1069,8 @@ _SINKSPEC_SCHEMAPROPERTIESENTRY = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1221,
-  serialized_end=1276,
+  serialized_start=1319,
+  serialized_end=1374,
 )
 
 _SINKSPEC_CONSUMERPROPERTIESENTRY = _descriptor.Descriptor(
@@ -983,8 +1107,8 @@ _SINKSPEC_CONSUMERPROPERTIESENTRY = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1278,
-  serialized_end=1335,
+  serialized_start=1376,
+  serialized_end=1433,
 )
 
 _SINKSPEC = _descriptor.Descriptor(
@@ -1084,8 +1208,8 @@ _SINKSPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=2072,
-  serialized_end=2548,
+  serialized_start=2550,
+  serialized_end=3026,
 )
 
 
@@ -1123,8 +1247,8 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=2550,
-  serialized_end=2622,
+  serialized_start=3028,
+  serialized_end=3100,
 )
 
 
@@ -1162,8 +1286,8 @@ _FUNCTIONMETADATA_INSTANCESTATESENTRY = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=2918,
-  serialized_end=2993,
+  serialized_start=3396,
+  serialized_end=3471,
 )
 
 _FUNCTIONMETADATA = _descriptor.Descriptor(
@@ -1228,8 +1352,8 @@ _FUNCTIONMETADATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=2625,
-  serialized_end=2993,
+  serialized_start=3103,
+  serialized_end=3471,
 )
 
 
@@ -1267,8 +1391,8 @@ _FUNCTIONAUTHENTICATIONSPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=2995,
-  serialized_end=3055,
+  serialized_start=3473,
+  serialized_end=3533,
 )
 
 
@@ -1306,8 +1430,8 @@ _INSTANCE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=3057,
-  serialized_end=3138,
+  serialized_start=3535,
+  serialized_end=3616,
 )
 
 
@@ -1345,8 +1469,8 @@ _ASSIGNMENT = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=3140,
-  serialized_end=3205,
+  serialized_start=3618,
+  serialized_end=3683,
 )
 
 _FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = _PROCESSINGGUARANTEES
@@ -1356,6 +1480,7 @@ _FUNCTIONDETAILS.fields_by_name['sink'].message_type = _SINKSPEC
 _FUNCTIONDETAILS.fields_by_name['resources'].message_type = _RESOURCES
 _FUNCTIONDETAILS.fields_by_name['retryDetails'].message_type = _RETRYDETAILS
 _FUNCTIONDETAILS.fields_by_name['componentType'].enum_type = _FUNCTIONDETAILS_COMPONENTTYPE
+_FUNCTIONDETAILS.fields_by_name['subscriptionPosition'].enum_type = _SUBSCRIPTIONPOSITION
 _FUNCTIONDETAILS_RUNTIME.containing_type = _FUNCTIONDETAILS
 _FUNCTIONDETAILS_COMPONENTTYPE.containing_type = _FUNCTIONDETAILS
 _CONSUMERSPEC_RECEIVERQUEUESIZE.containing_type = _CONSUMERSPEC
@@ -1364,6 +1489,11 @@ _CONSUMERSPEC_CONSUMERPROPERTIESENTRY.containing_type = _CONSUMERSPEC
 _CONSUMERSPEC.fields_by_name['receiverQueueSize'].message_type = _CONSUMERSPEC_RECEIVERQUEUESIZE
 _CONSUMERSPEC.fields_by_name['schemaProperties'].message_type = _CONSUMERSPEC_SCHEMAPROPERTIESENTRY
 _CONSUMERSPEC.fields_by_name['consumerProperties'].message_type = _CONSUMERSPEC_CONSUMERPROPERTIESENTRY
+_CONSUMERSPEC.fields_by_name['cryptoSpec'].message_type = _CRYPTOSPEC
+_PRODUCERSPEC.fields_by_name['cryptoSpec'].message_type = _CRYPTOSPEC
+_CRYPTOSPEC.fields_by_name['producerCryptoFailureAction'].enum_type = _CRYPTOSPEC_FAILUREACTION
+_CRYPTOSPEC.fields_by_name['consumerCryptoFailureAction'].enum_type = _CRYPTOSPEC_FAILUREACTION
+_CRYPTOSPEC_FAILUREACTION.containing_type = _CRYPTOSPEC
 _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY.containing_type = _SOURCESPEC
 _SOURCESPEC_INPUTSPECSENTRY.fields_by_name['value'].message_type = _CONSUMERSPEC
 _SOURCESPEC_INPUTSPECSENTRY.containing_type = _SOURCESPEC
@@ -1389,6 +1519,7 @@ DESCRIPTOR.message_types_by_name['RetryDetails'] = _RETRYDETAILS
 DESCRIPTOR.message_types_by_name['FunctionDetails'] = _FUNCTIONDETAILS
 DESCRIPTOR.message_types_by_name['ConsumerSpec'] = _CONSUMERSPEC
 DESCRIPTOR.message_types_by_name['ProducerSpec'] = _PRODUCERSPEC
+DESCRIPTOR.message_types_by_name['CryptoSpec'] = _CRYPTOSPEC
 DESCRIPTOR.message_types_by_name['SourceSpec'] = _SOURCESPEC
 DESCRIPTOR.message_types_by_name['SinkSpec'] = _SINKSPEC
 DESCRIPTOR.message_types_by_name['PackageLocationMetaData'] = _PACKAGELOCATIONMETADATA
@@ -1461,6 +1592,13 @@ ProducerSpec = _reflection.GeneratedProtocolMessageType('ProducerSpec', (_messag
   })
 _sym_db.RegisterMessage(ProducerSpec)
 
+CryptoSpec = _reflection.GeneratedProtocolMessageType('CryptoSpec', (_message.Message,), {
+  'DESCRIPTOR' : _CRYPTOSPEC,
+  '__module__' : 'Function_pb2'
+  # @@protoc_insertion_point(class_scope:proto.CryptoSpec)
+  })
+_sym_db.RegisterMessage(CryptoSpec)
+
 SourceSpec = _reflection.GeneratedProtocolMessageType('SourceSpec', (_message.Message,), {
 
   'TopicsToSerDeClassNameEntry' : _reflection.GeneratedProtocolMessageType('TopicsToSerDeClassNameEntry', (_message.Message,), {
diff --git a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
index d665e98..d1ba3bb 100644
--- a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
+++ b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
@@ -19,14 +19,11 @@
 
 # Generated by the protocol buffer compiler.  DO NOT EDIT!
 # source: InstanceCommunication.proto
-
-import sys
-_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
+"""Generated protocol buffer code."""
 from google.protobuf import descriptor as _descriptor
 from google.protobuf import message as _message
 from google.protobuf import reflection as _reflection
 from google.protobuf import symbol_database as _symbol_database
-from google.protobuf import descriptor_pb2
 # @@protoc_insertion_point(imports)
 
 _sym_db = _symbol_database.Default()
@@ -39,7 +36,9 @@ DESCRIPTOR = _descriptor.FileDescriptor(
   name='InstanceCommunication.proto',
   package='proto',
   syntax='proto3',
-  serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xf6\x03\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 \x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 \x01(\x03\x12\x13\n\x0bnumReceived\x18\x11 \x01(\x03\x12 \n\x18numSuccessfullyProcessed\x18\x05 \x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 \x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 \x03(\x0b\x32*.proto.FunctionStatus.Exc [...]
+  serialized_options=b'\n!org.apache.pulsar.functions.protoB\025InstanceCommunication',
+  create_key=_descriptor._internal_create_key,
+  serialized_pb=b'\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xc4\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 \x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 \x01(\x03\x12\x13\n\x0bnumReceived\x18\x11 \x01(\x03\x12 \n\x18numSuccessfullyProcessed\x18\x05 \x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 \x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 \x03(\x0b\x32*.proto.FunctionStatus.Excep [...]
   ,
   dependencies=[google_dot_protobuf_dot_empty__pb2.DESCRIPTOR,])
 
@@ -52,35 +51,36 @@ _FUNCTIONSTATUS_EXCEPTIONINFORMATION = _descriptor.Descriptor(
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
+  create_key=_descriptor._internal_create_key,
   fields=[
     _descriptor.FieldDescriptor(
       name='exceptionString', full_name='proto.FunctionStatus.ExceptionInformation.exceptionString', index=0,
       number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='msSinceEpoch', full_name='proto.FunctionStatus.ExceptionInformation.msSinceEpoch', index=1,
       number=2, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
   ],
   extensions=[
   ],
   nested_types=[],
   enum_types=[
   ],
-  options=None,
+  serialized_options=None,
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=501,
-  serialized_end=570,
+  serialized_start=707,
+  serialized_end=776,
 )
 
 _FUNCTIONSTATUS = _descriptor.Descriptor(
@@ -89,6 +89,7 @@ _FUNCTIONSTATUS = _descriptor.Descriptor(
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
+  create_key=_descriptor._internal_create_key,
   fields=[
     _descriptor.FieldDescriptor(
       name='running', full_name='proto.FunctionStatus.running', index=0,
@@ -96,105 +97,172 @@ _FUNCTIONSTATUS = _descriptor.Descriptor(
       has_default_value=False, default_value=False,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='failureException', full_name='proto.FunctionStatus.failureException', index=1,
       number=2, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='numRestarts', full_name='proto.FunctionStatus.numRestarts', index=2,
       number=3, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='numReceived', full_name='proto.FunctionStatus.numReceived', index=3,
       number=17, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='numSuccessfullyProcessed', full_name='proto.FunctionStatus.numSuccessfullyProcessed', index=4,
       number=5, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='numUserExceptions', full_name='proto.FunctionStatus.numUserExceptions', index=5,
       number=6, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='latestUserExceptions', full_name='proto.FunctionStatus.latestUserExceptions', index=6,
       number=7, type=11, cpp_type=10, label=3,
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='numSystemExceptions', full_name='proto.FunctionStatus.numSystemExceptions', index=7,
       number=8, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='latestSystemExceptions', full_name='proto.FunctionStatus.latestSystemExceptions', index=8,
       number=9, type=11, cpp_type=10, label=3,
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='numSourceExceptions', full_name='proto.FunctionStatus.numSourceExceptions', index=9,
+      number=18, type=3, cpp_type=2, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='latestSourceExceptions', full_name='proto.FunctionStatus.latestSourceExceptions', index=10,
+      number=19, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
-      name='averageLatency', full_name='proto.FunctionStatus.averageLatency', index=9,
+      name='numSinkExceptions', full_name='proto.FunctionStatus.numSinkExceptions', index=11,
+      number=20, type=3, cpp_type=2, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='latestSinkExceptions', full_name='proto.FunctionStatus.latestSinkExceptions', index=12,
+      number=21, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='averageLatency', full_name='proto.FunctionStatus.averageLatency', index=13,
       number=12, type=1, cpp_type=5, label=1,
       has_default_value=False, default_value=float(0),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
-      name='lastInvocationTime', full_name='proto.FunctionStatus.lastInvocationTime', index=10,
+      name='lastInvocationTime', full_name='proto.FunctionStatus.lastInvocationTime', index=14,
       number=13, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
-      name='instanceId', full_name='proto.FunctionStatus.instanceId', index=11,
+      name='instanceId', full_name='proto.FunctionStatus.instanceId', index=15,
       number=14, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
-      name='workerId', full_name='proto.FunctionStatus.workerId', index=12,
+      name='workerId', full_name='proto.FunctionStatus.workerId', index=16,
       number=16, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
   ],
   extensions=[
   ],
   nested_types=[_FUNCTIONSTATUS_EXCEPTIONINFORMATION, ],
   enum_types=[
   ],
-  options=None,
+  serialized_options=None,
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
   oneofs=[
   ],
   serialized_start=68,
-  serialized_end=570,
+  serialized_end=776,
+)
+
+
+_FUNCTIONSTATUSLIST = _descriptor.Descriptor(
+  name='FunctionStatusList',
+  full_name='proto.FunctionStatusList',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  create_key=_descriptor._internal_create_key,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='error', full_name='proto.FunctionStatusList.error', index=0,
+      number=2, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=b"".decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='functionStatusList', full_name='proto.FunctionStatusList.functionStatusList', index=1,
+      number=1, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  serialized_options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=778,
+  serialized_end=864,
 )
 
 
@@ -204,35 +272,36 @@ _METRICSDATA_USERMETRICSENTRY = _descriptor.Descriptor(
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
+  create_key=_descriptor._internal_create_key,
   fields=[
     _descriptor.FieldDescriptor(
       name='key', full_name='proto.MetricsData.UserMetricsEntry.key', index=0,
       number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='value', full_name='proto.MetricsData.UserMetricsEntry.value', index=1,
       number=2, type=1, cpp_type=5, label=1,
       has_default_value=False, default_value=float(0),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
   ],
   extensions=[
   ],
   nested_types=[],
   enum_types=[
   ],
-  options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+  serialized_options=b'8\001',
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=987,
-  serialized_end=1037,
+  serialized_start=1281,
+  serialized_end=1331,
 )
 
 _METRICSDATA = _descriptor.Descriptor(
@@ -241,6 +310,7 @@ _METRICSDATA = _descriptor.Descriptor(
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
+  create_key=_descriptor._internal_create_key,
   fields=[
     _descriptor.FieldDescriptor(
       name='receivedTotal', full_name='proto.MetricsData.receivedTotal', index=0,
@@ -248,98 +318,98 @@ _METRICSDATA = _descriptor.Descriptor(
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='receivedTotal_1min', full_name='proto.MetricsData.receivedTotal_1min', index=1,
       number=10, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='processedSuccessfullyTotal', full_name='proto.MetricsData.processedSuccessfullyTotal', index=2,
       number=4, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='processedSuccessfullyTotal_1min', full_name='proto.MetricsData.processedSuccessfullyTotal_1min', index=3,
       number=12, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='systemExceptionsTotal', full_name='proto.MetricsData.systemExceptionsTotal', index=4,
       number=5, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='systemExceptionsTotal_1min', full_name='proto.MetricsData.systemExceptionsTotal_1min', index=5,
       number=13, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='userExceptionsTotal', full_name='proto.MetricsData.userExceptionsTotal', index=6,
       number=6, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='userExceptionsTotal_1min', full_name='proto.MetricsData.userExceptionsTotal_1min', index=7,
       number=14, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='avgProcessLatency', full_name='proto.MetricsData.avgProcessLatency', index=8,
       number=7, type=1, cpp_type=5, label=1,
       has_default_value=False, default_value=float(0),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='avgProcessLatency_1min', full_name='proto.MetricsData.avgProcessLatency_1min', index=9,
       number=15, type=1, cpp_type=5, label=1,
       has_default_value=False, default_value=float(0),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='lastInvocation', full_name='proto.MetricsData.lastInvocation', index=10,
       number=8, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='userMetrics', full_name='proto.MetricsData.userMetrics', index=11,
       number=9, type=11, cpp_type=10, label=3,
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
   ],
   extensions=[
   ],
   nested_types=[_METRICSDATA_USERMETRICSENTRY, ],
   enum_types=[
   ],
-  options=None,
+  serialized_options=None,
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=573,
-  serialized_end=1037,
+  serialized_start=867,
+  serialized_end=1331,
 )
 
 
@@ -349,6 +419,7 @@ _HEALTHCHECKRESULT = _descriptor.Descriptor(
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
+  create_key=_descriptor._internal_create_key,
   fields=[
     _descriptor.FieldDescriptor(
       name='success', full_name='proto.HealthCheckResult.success', index=0,
@@ -356,21 +427,21 @@ _HEALTHCHECKRESULT = _descriptor.Descriptor(
       has_default_value=False, default_value=False,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
   ],
   extensions=[
   ],
   nested_types=[],
   enum_types=[
   ],
-  options=None,
+  serialized_options=None,
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1039,
-  serialized_end=1075,
+  serialized_start=1333,
+  serialized_end=1369,
 )
 
 
@@ -380,42 +451,43 @@ _METRICS_INSTANCEMETRICS = _descriptor.Descriptor(
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
+  create_key=_descriptor._internal_create_key,
   fields=[
     _descriptor.FieldDescriptor(
       name='name', full_name='proto.Metrics.InstanceMetrics.name', index=0,
       number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='instanceId', full_name='proto.Metrics.InstanceMetrics.instanceId', index=1,
       number=2, type=5, cpp_type=1, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='metricsData', full_name='proto.Metrics.InstanceMetrics.metricsData', index=2,
       number=3, type=11, cpp_type=10, label=1,
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
   ],
   extensions=[
   ],
   nested_types=[],
   enum_types=[
   ],
-  options=None,
+  serialized_options=None,
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1138,
-  serialized_end=1230,
+  serialized_start=1432,
+  serialized_end=1524,
 )
 
 _METRICS = _descriptor.Descriptor(
@@ -424,6 +496,7 @@ _METRICS = _descriptor.Descriptor(
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
+  create_key=_descriptor._internal_create_key,
   fields=[
     _descriptor.FieldDescriptor(
       name='metrics', full_name='proto.Metrics.metrics', index=0,
@@ -431,103 +504,113 @@ _METRICS = _descriptor.Descriptor(
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
   ],
   extensions=[
   ],
   nested_types=[_METRICS_INSTANCEMETRICS, ],
   enum_types=[
   ],
-  options=None,
+  serialized_options=None,
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1078,
-  serialized_end=1230,
+  serialized_start=1372,
+  serialized_end=1524,
 )
 
 _FUNCTIONSTATUS_EXCEPTIONINFORMATION.containing_type = _FUNCTIONSTATUS
 _FUNCTIONSTATUS.fields_by_name['latestUserExceptions'].message_type = _FUNCTIONSTATUS_EXCEPTIONINFORMATION
 _FUNCTIONSTATUS.fields_by_name['latestSystemExceptions'].message_type = _FUNCTIONSTATUS_EXCEPTIONINFORMATION
+_FUNCTIONSTATUS.fields_by_name['latestSourceExceptions'].message_type = _FUNCTIONSTATUS_EXCEPTIONINFORMATION
+_FUNCTIONSTATUS.fields_by_name['latestSinkExceptions'].message_type = _FUNCTIONSTATUS_EXCEPTIONINFORMATION
+_FUNCTIONSTATUSLIST.fields_by_name['functionStatusList'].message_type = _FUNCTIONSTATUS
 _METRICSDATA_USERMETRICSENTRY.containing_type = _METRICSDATA
 _METRICSDATA.fields_by_name['userMetrics'].message_type = _METRICSDATA_USERMETRICSENTRY
 _METRICS_INSTANCEMETRICS.fields_by_name['metricsData'].message_type = _METRICSDATA
 _METRICS_INSTANCEMETRICS.containing_type = _METRICS
 _METRICS.fields_by_name['metrics'].message_type = _METRICS_INSTANCEMETRICS
 DESCRIPTOR.message_types_by_name['FunctionStatus'] = _FUNCTIONSTATUS
+DESCRIPTOR.message_types_by_name['FunctionStatusList'] = _FUNCTIONSTATUSLIST
 DESCRIPTOR.message_types_by_name['MetricsData'] = _METRICSDATA
 DESCRIPTOR.message_types_by_name['HealthCheckResult'] = _HEALTHCHECKRESULT
 DESCRIPTOR.message_types_by_name['Metrics'] = _METRICS
 _sym_db.RegisterFileDescriptor(DESCRIPTOR)
 
-FunctionStatus = _reflection.GeneratedProtocolMessageType('FunctionStatus', (_message.Message,), dict(
+FunctionStatus = _reflection.GeneratedProtocolMessageType('FunctionStatus', (_message.Message,), {
 
-  ExceptionInformation = _reflection.GeneratedProtocolMessageType('ExceptionInformation', (_message.Message,), dict(
-    DESCRIPTOR = _FUNCTIONSTATUS_EXCEPTIONINFORMATION,
-    __module__ = 'InstanceCommunication_pb2'
+  'ExceptionInformation' : _reflection.GeneratedProtocolMessageType('ExceptionInformation', (_message.Message,), {
+    'DESCRIPTOR' : _FUNCTIONSTATUS_EXCEPTIONINFORMATION,
+    '__module__' : 'InstanceCommunication_pb2'
     # @@protoc_insertion_point(class_scope:proto.FunctionStatus.ExceptionInformation)
-    ))
+    })
   ,
-  DESCRIPTOR = _FUNCTIONSTATUS,
-  __module__ = 'InstanceCommunication_pb2'
+  'DESCRIPTOR' : _FUNCTIONSTATUS,
+  '__module__' : 'InstanceCommunication_pb2'
   # @@protoc_insertion_point(class_scope:proto.FunctionStatus)
-  ))
+  })
 _sym_db.RegisterMessage(FunctionStatus)
 _sym_db.RegisterMessage(FunctionStatus.ExceptionInformation)
 
-MetricsData = _reflection.GeneratedProtocolMessageType('MetricsData', (_message.Message,), dict(
+FunctionStatusList = _reflection.GeneratedProtocolMessageType('FunctionStatusList', (_message.Message,), {
+  'DESCRIPTOR' : _FUNCTIONSTATUSLIST,
+  '__module__' : 'InstanceCommunication_pb2'
+  # @@protoc_insertion_point(class_scope:proto.FunctionStatusList)
+  })
+_sym_db.RegisterMessage(FunctionStatusList)
+
+MetricsData = _reflection.GeneratedProtocolMessageType('MetricsData', (_message.Message,), {
 
-  UserMetricsEntry = _reflection.GeneratedProtocolMessageType('UserMetricsEntry', (_message.Message,), dict(
-    DESCRIPTOR = _METRICSDATA_USERMETRICSENTRY,
-    __module__ = 'InstanceCommunication_pb2'
+  'UserMetricsEntry' : _reflection.GeneratedProtocolMessageType('UserMetricsEntry', (_message.Message,), {
+    'DESCRIPTOR' : _METRICSDATA_USERMETRICSENTRY,
+    '__module__' : 'InstanceCommunication_pb2'
     # @@protoc_insertion_point(class_scope:proto.MetricsData.UserMetricsEntry)
-    ))
+    })
   ,
-  DESCRIPTOR = _METRICSDATA,
-  __module__ = 'InstanceCommunication_pb2'
+  'DESCRIPTOR' : _METRICSDATA,
+  '__module__' : 'InstanceCommunication_pb2'
   # @@protoc_insertion_point(class_scope:proto.MetricsData)
-  ))
+  })
 _sym_db.RegisterMessage(MetricsData)
 _sym_db.RegisterMessage(MetricsData.UserMetricsEntry)
 
-HealthCheckResult = _reflection.GeneratedProtocolMessageType('HealthCheckResult', (_message.Message,), dict(
-  DESCRIPTOR = _HEALTHCHECKRESULT,
-  __module__ = 'InstanceCommunication_pb2'
+HealthCheckResult = _reflection.GeneratedProtocolMessageType('HealthCheckResult', (_message.Message,), {
+  'DESCRIPTOR' : _HEALTHCHECKRESULT,
+  '__module__' : 'InstanceCommunication_pb2'
   # @@protoc_insertion_point(class_scope:proto.HealthCheckResult)
-  ))
+  })
 _sym_db.RegisterMessage(HealthCheckResult)
 
-Metrics = _reflection.GeneratedProtocolMessageType('Metrics', (_message.Message,), dict(
+Metrics = _reflection.GeneratedProtocolMessageType('Metrics', (_message.Message,), {
 
-  InstanceMetrics = _reflection.GeneratedProtocolMessageType('InstanceMetrics', (_message.Message,), dict(
-    DESCRIPTOR = _METRICS_INSTANCEMETRICS,
-    __module__ = 'InstanceCommunication_pb2'
+  'InstanceMetrics' : _reflection.GeneratedProtocolMessageType('InstanceMetrics', (_message.Message,), {
+    'DESCRIPTOR' : _METRICS_INSTANCEMETRICS,
+    '__module__' : 'InstanceCommunication_pb2'
     # @@protoc_insertion_point(class_scope:proto.Metrics.InstanceMetrics)
-    ))
+    })
   ,
-  DESCRIPTOR = _METRICS,
-  __module__ = 'InstanceCommunication_pb2'
+  'DESCRIPTOR' : _METRICS,
+  '__module__' : 'InstanceCommunication_pb2'
   # @@protoc_insertion_point(class_scope:proto.Metrics)
-  ))
+  })
 _sym_db.RegisterMessage(Metrics)
 _sym_db.RegisterMessage(Metrics.InstanceMetrics)
 
 
-DESCRIPTOR.has_options = True
-DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n!org.apache.pulsar.functions.protoB\025InstanceCommunication'))
-_METRICSDATA_USERMETRICSENTRY.has_options = True
-_METRICSDATA_USERMETRICSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
+DESCRIPTOR._options = None
+_METRICSDATA_USERMETRICSENTRY._options = None
 
 _INSTANCECONTROL = _descriptor.ServiceDescriptor(
   name='InstanceControl',
   full_name='proto.InstanceControl',
   file=DESCRIPTOR,
   index=0,
-  options=None,
-  serialized_start=1233,
-  serialized_end=1581,
+  serialized_options=None,
+  create_key=_descriptor._internal_create_key,
+  serialized_start=1527,
+  serialized_end=1875,
   methods=[
   _descriptor.MethodDescriptor(
     name='GetFunctionStatus',
@@ -536,7 +619,8 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor(
     containing_service=None,
     input_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
     output_type=_FUNCTIONSTATUS,
-    options=None,
+    serialized_options=None,
+    create_key=_descriptor._internal_create_key,
   ),
   _descriptor.MethodDescriptor(
     name='GetAndResetMetrics',
@@ -545,7 +629,8 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor(
     containing_service=None,
     input_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
     output_type=_METRICSDATA,
-    options=None,
+    serialized_options=None,
+    create_key=_descriptor._internal_create_key,
   ),
   _descriptor.MethodDescriptor(
     name='ResetMetrics',
@@ -554,7 +639,8 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor(
     containing_service=None,
     input_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
     output_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
-    options=None,
+    serialized_options=None,
+    create_key=_descriptor._internal_create_key,
   ),
   _descriptor.MethodDescriptor(
     name='GetMetrics',
@@ -563,7 +649,8 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor(
     containing_service=None,
     input_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
     output_type=_METRICSDATA,
-    options=None,
+    serialized_options=None,
+    create_key=_descriptor._internal_create_key,
   ),
   _descriptor.MethodDescriptor(
     name='HealthCheck',
@@ -572,7 +659,8 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor(
     containing_service=None,
     input_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
     output_type=_HEALTHCHECKRESULT,
-    options=None,
+    serialized_options=None,
+    create_key=_descriptor._internal_create_key,
   ),
 ])
 _sym_db.RegisterServiceDescriptor(_INSTANCECONTROL)
diff --git a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py
index 21730e1..e895f53 100644
--- a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py
+++ b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py
@@ -18,6 +18,7 @@
 #
 
 # Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
+"""Client and server classes corresponding to protobuf-defined services."""
 import grpc
 
 import InstanceCommunication_pb2 as InstanceCommunication__pb2
@@ -25,110 +26,193 @@ from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2
 
 
 class InstanceControlStub(object):
-  # missing associated documentation comment in .proto file
-  pass
-
-  def __init__(self, channel):
-    """Constructor.
-
-    Args:
-      channel: A grpc.Channel.
-    """
-    self.GetFunctionStatus = channel.unary_unary(
-        '/proto.InstanceControl/GetFunctionStatus',
-        request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
-        response_deserializer=InstanceCommunication__pb2.FunctionStatus.FromString,
-        )
-    self.GetAndResetMetrics = channel.unary_unary(
-        '/proto.InstanceControl/GetAndResetMetrics',
-        request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
-        response_deserializer=InstanceCommunication__pb2.MetricsData.FromString,
-        )
-    self.ResetMetrics = channel.unary_unary(
-        '/proto.InstanceControl/ResetMetrics',
-        request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
-        response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
-        )
-    self.GetMetrics = channel.unary_unary(
-        '/proto.InstanceControl/GetMetrics',
-        request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
-        response_deserializer=InstanceCommunication__pb2.MetricsData.FromString,
-        )
-    self.HealthCheck = channel.unary_unary(
-        '/proto.InstanceControl/HealthCheck',
-        request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
-        response_deserializer=InstanceCommunication__pb2.HealthCheckResult.FromString,
-        )
+    """Missing associated documentation comment in .proto file."""
+
+    def __init__(self, channel):
+        """Constructor.
+
+        Args:
+            channel: A grpc.Channel.
+        """
+        self.GetFunctionStatus = channel.unary_unary(
+                '/proto.InstanceControl/GetFunctionStatus',
+                request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+                response_deserializer=InstanceCommunication__pb2.FunctionStatus.FromString,
+                )
+        self.GetAndResetMetrics = channel.unary_unary(
+                '/proto.InstanceControl/GetAndResetMetrics',
+                request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+                response_deserializer=InstanceCommunication__pb2.MetricsData.FromString,
+                )
+        self.ResetMetrics = channel.unary_unary(
+                '/proto.InstanceControl/ResetMetrics',
+                request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+                response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+                )
+        self.GetMetrics = channel.unary_unary(
+                '/proto.InstanceControl/GetMetrics',
+                request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+                response_deserializer=InstanceCommunication__pb2.MetricsData.FromString,
+                )
+        self.HealthCheck = channel.unary_unary(
+                '/proto.InstanceControl/HealthCheck',
+                request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+                response_deserializer=InstanceCommunication__pb2.HealthCheckResult.FromString,
+                )
 
 
 class InstanceControlServicer(object):
-  # missing associated documentation comment in .proto file
-  pass
-
-  def GetFunctionStatus(self, request, context):
-    # missing associated documentation comment in .proto file
-    pass
-    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
-    context.set_details('Method not implemented!')
-    raise NotImplementedError('Method not implemented!')
-
-  def GetAndResetMetrics(self, request, context):
-    # missing associated documentation comment in .proto file
-    pass
-    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
-    context.set_details('Method not implemented!')
-    raise NotImplementedError('Method not implemented!')
-
-  def ResetMetrics(self, request, context):
-    # missing associated documentation comment in .proto file
-    pass
-    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
-    context.set_details('Method not implemented!')
-    raise NotImplementedError('Method not implemented!')
-
-  def GetMetrics(self, request, context):
-    # missing associated documentation comment in .proto file
-    pass
-    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
-    context.set_details('Method not implemented!')
-    raise NotImplementedError('Method not implemented!')
-
-  def HealthCheck(self, request, context):
-    # missing associated documentation comment in .proto file
-    pass
-    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
-    context.set_details('Method not implemented!')
-    raise NotImplementedError('Method not implemented!')
+    """Missing associated documentation comment in .proto file."""
+
+    def GetFunctionStatus(self, request, context):
+        """Missing associated documentation comment in .proto file."""
+        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+        context.set_details('Method not implemented!')
+        raise NotImplementedError('Method not implemented!')
+
+    def GetAndResetMetrics(self, request, context):
+        """Missing associated documentation comment in .proto file."""
+        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+        context.set_details('Method not implemented!')
+        raise NotImplementedError('Method not implemented!')
+
+    def ResetMetrics(self, request, context):
+        """Missing associated documentation comment in .proto file."""
+        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+        context.set_details('Method not implemented!')
+        raise NotImplementedError('Method not implemented!')
+
+    def GetMetrics(self, request, context):
+        """Missing associated documentation comment in .proto file."""
+        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+        context.set_details('Method not implemented!')
+        raise NotImplementedError('Method not implemented!')
+
+    def HealthCheck(self, request, context):
+        """Missing associated documentation comment in .proto file."""
+        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+        context.set_details('Method not implemented!')
+        raise NotImplementedError('Method not implemented!')
 
 
 def add_InstanceControlServicer_to_server(servicer, server):
-  rpc_method_handlers = {
-      'GetFunctionStatus': grpc.unary_unary_rpc_method_handler(
-          servicer.GetFunctionStatus,
-          request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
-          response_serializer=InstanceCommunication__pb2.FunctionStatus.SerializeToString,
-      ),
-      'GetAndResetMetrics': grpc.unary_unary_rpc_method_handler(
-          servicer.GetAndResetMetrics,
-          request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
-          response_serializer=InstanceCommunication__pb2.MetricsData.SerializeToString,
-      ),
-      'ResetMetrics': grpc.unary_unary_rpc_method_handler(
-          servicer.ResetMetrics,
-          request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
-          response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
-      ),
-      'GetMetrics': grpc.unary_unary_rpc_method_handler(
-          servicer.GetMetrics,
-          request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
-          response_serializer=InstanceCommunication__pb2.MetricsData.SerializeToString,
-      ),
-      'HealthCheck': grpc.unary_unary_rpc_method_handler(
-          servicer.HealthCheck,
-          request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
-          response_serializer=InstanceCommunication__pb2.HealthCheckResult.SerializeToString,
-      ),
-  }
-  generic_handler = grpc.method_handlers_generic_handler(
-      'proto.InstanceControl', rpc_method_handlers)
-  server.add_generic_rpc_handlers((generic_handler,))
+    rpc_method_handlers = {
+            'GetFunctionStatus': grpc.unary_unary_rpc_method_handler(
+                    servicer.GetFunctionStatus,
+                    request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+                    response_serializer=InstanceCommunication__pb2.FunctionStatus.SerializeToString,
+            ),
+            'GetAndResetMetrics': grpc.unary_unary_rpc_method_handler(
+                    servicer.GetAndResetMetrics,
+                    request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+                    response_serializer=InstanceCommunication__pb2.MetricsData.SerializeToString,
+            ),
+            'ResetMetrics': grpc.unary_unary_rpc_method_handler(
+                    servicer.ResetMetrics,
+                    request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+                    response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+            ),
+            'GetMetrics': grpc.unary_unary_rpc_method_handler(
+                    servicer.GetMetrics,
+                    request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+                    response_serializer=InstanceCommunication__pb2.MetricsData.SerializeToString,
+            ),
+            'HealthCheck': grpc.unary_unary_rpc_method_handler(
+                    servicer.HealthCheck,
+                    request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+                    response_serializer=InstanceCommunication__pb2.HealthCheckResult.SerializeToString,
+            ),
+    }
+    generic_handler = grpc.method_handlers_generic_handler(
+            'proto.InstanceControl', rpc_method_handlers)
+    server.add_generic_rpc_handlers((generic_handler,))
+
+
+ # This class is part of an EXPERIMENTAL API.
+class InstanceControl(object):
+    """Missing associated documentation comment in .proto file."""
+
+    @staticmethod
+    def GetFunctionStatus(request,
+            target,
+            options=(),
+            channel_credentials=None,
+            call_credentials=None,
+            insecure=False,
+            compression=None,
+            wait_for_ready=None,
+            timeout=None,
+            metadata=None):
+        return grpc.experimental.unary_unary(request, target, '/proto.InstanceControl/GetFunctionStatus',
+            google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+            InstanceCommunication__pb2.FunctionStatus.FromString,
+            options, channel_credentials,
+            insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
+
+    @staticmethod
+    def GetAndResetMetrics(request,
+            target,
+            options=(),
+            channel_credentials=None,
+            call_credentials=None,
+            insecure=False,
+            compression=None,
+            wait_for_ready=None,
+            timeout=None,
+            metadata=None):
+        return grpc.experimental.unary_unary(request, target, '/proto.InstanceControl/GetAndResetMetrics',
+            google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+            InstanceCommunication__pb2.MetricsData.FromString,
+            options, channel_credentials,
+            insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
+
+    @staticmethod
+    def ResetMetrics(request,
+            target,
+            options=(),
+            channel_credentials=None,
+            call_credentials=None,
+            insecure=False,
+            compression=None,
+            wait_for_ready=None,
+            timeout=None,
+            metadata=None):
+        return grpc.experimental.unary_unary(request, target, '/proto.InstanceControl/ResetMetrics',
+            google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+            google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+            options, channel_credentials,
+            insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
+
+    @staticmethod
+    def GetMetrics(request,
+            target,
+            options=(),
+            channel_credentials=None,
+            call_credentials=None,
+            insecure=False,
+            compression=None,
+            wait_for_ready=None,
+            timeout=None,
+            metadata=None):
+        return grpc.experimental.unary_unary(request, target, '/proto.InstanceControl/GetMetrics',
+            google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+            InstanceCommunication__pb2.MetricsData.FromString,
+            options, channel_credentials,
+            insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
+
+    @staticmethod
+    def HealthCheck(request,
+            target,
+            options=(),
+            channel_credentials=None,
+            call_credentials=None,
+            insecure=False,
+            compression=None,
+            wait_for_ready=None,
+            timeout=None,
+            metadata=None):
+        return grpc.experimental.unary_unary(request, target, '/proto.InstanceControl/HealthCheck',
+            google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+            InstanceCommunication__pb2.HealthCheckResult.FromString,
+            options, channel_credentials,
+            insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index d6ff4fb..fecde7a 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -309,10 +309,18 @@ class PythonInstance(object):
             len(self.instance_config.function_details.sink.topic) > 0:
       Log.debug("Setting up producer for topic %s" % self.instance_config.function_details.sink.topic)
 
+      batch_type = pulsar.BatchingType.Default
+      if self.instance_config.function_details.sink.producerSpec.batchBuilder != None and \
+            len(self.instance_config.function_details.sink.producerSpec.batchBuilder) > 0:
+        batch_builder = self.instance_config.function_details.sink.producerSpec.batchBuilder
+        if batch_builder == "KEY_BASED":
+          batch_type = pulsar.BatchingType.KeyBased
+
       self.producer = self.pulsar_client.create_producer(
         str(self.instance_config.function_details.sink.topic),
         block_if_queue_full=True,
         batching_enabled=True,
+        batching_type=batch_type,
         batching_max_publish_delay_ms=10,
         compression_type=pulsar.CompressionType.LZ4,
         # set send timeout to be infinity to prevent potential deadlock with consumer
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 723cd46..d95d1d3 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -26,6 +26,7 @@ import net.jodah.failsafe.RetryPolicy;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -981,6 +982,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
 
                 @Cleanup Producer<byte[]> producer = client.newProducer(Schema.BYTES)
                         .topic(inputTopicName)
+                        .enableBatching(true)
+                        .batcherBuilder(BatcherBuilder.DEFAULT)
                         .create();
 
                 for (int i = 0; i < numMessages; i++) {