You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:33:47 UTC

[pulsar] 01/38: [hotfix]Python function protobuf missing field and broker test failed (#6641)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d39113abe50c1de540f0f1d52a78007b5459afe3
Author: guangning <gu...@apache.org>
AuthorDate: Tue Mar 31 20:58:29 2020 +0800

    [hotfix]Python function protobuf missing field and broker test failed (#6641)
    
    At present, in the test, we found that due to the addition of a field `forwardSourceMessageProperty` in the proto file of function, this field was lost in the proto file generated by python and go. Due to python parsing with the following code:
    
    ```
    json_format.Parse(args.function_details, function_details)
    ```
    
    the following exception will be thrown.
    
    ```
    2020-03-30T13:13:25.2339031Z 13:13:24.379 [pulsar-external-listener-20-1] INFO  org.apache.pulsar.functions.runtime.process.ProcessRuntime - Started process successfully
    2020-03-30T13:13:25.2339190Z Traceback (most recent call last):
    2020-03-30T13:13:25.2340782Z   File "/pulsar/instances/python-instance/python_instance_main.py", line 211, in <module>
    2020-03-30T13:13:25.2340944Z     main()
    2020-03-30T13:13:25.2342589Z   File "/pulsar/instances/python-instance/python_instance_main.py", line 98, in main
    2020-03-30T13:13:25.2342744Z     json_format.Parse(args.function_details, function_details)
    2020-03-30T13:13:25.2354119Z   File "/usr/local/lib/python2.7/dist-packages/google/protobuf/json_format.py", line 430, in Parse
    2020-03-30T13:13:25.2354284Z     return ParseDict(js, message, ignore_unknown_fields, descriptor_pool)
    2020-03-30T13:13:25.2354689Z   File "/usr/local/lib/python2.7/dist-packages/google/protobuf/json_format.py", line 450, in ParseDict
    2020-03-30T13:13:25.2354882Z     parser.ConvertMessage(js_dict, message)
    2020-03-30T13:13:25.2355386Z   File "/usr/local/lib/python2.7/dist-packages/google/protobuf/json_format.py", line 481, in ConvertMessage
    2020-03-30T13:13:25.2355537Z     self._ConvertFieldValuePair(value, message)
    2020-03-30T13:13:25.2356082Z   File "/usr/local/lib/python2.7/dist-packages/google/protobuf/json_format.py", line 590, in _ConvertFieldValuePair
    2020-03-30T13:13:25.2356559Z     raise ParseError('Failed to parse {0} field: {1}.'.format(name, e))
    2020-03-30T13:13:25.2357199Z google.protobuf.json_format.ParseError: Failed to parse sink field: Message type "proto.SinkSpec" has no field named "forwardSourceMessageProperty".
    2020-03-30T13:13:25.2357634Z  Available Fields(except extensions): ['className', 'configs', 'typeClassName', 'topic', 'serDeClassName', 'builtin', 'schemaType'].
    ```
    
    This pr is mainly to fix the proto file generated by python first. In order to make the test pass smoothly, I will gradually fix this problem in other languages in the next pull request.
    
    * Enable build docker image to pulsar and pulsar-all.
    * Add new generated protobuf file for python
    * Disable go function integration test
    * Add sleep for method testGetPartitionedStatsInternal
    
    The integration process test passed https://github.com/AmateurEvents/pulsar/pull/22
    (cherry picked from commit c955ff9ebf363175991704d5dd17da7bdc5e7468)
---
 .github/workflows/ci-integration-process.yaml      |   8 +
 .github/workflows/ci-integration-thread.yaml       |   8 +
 .../apache/pulsar/broker/admin/AdminApiTest.java   |   2 +
 .../instance/src/main/python/Function_pb2.py       | 320 ++++++++++++---------
 4 files changed, 201 insertions(+), 137 deletions(-)

diff --git a/.github/workflows/ci-integration-process.yaml b/.github/workflows/ci-integration-process.yaml
index 74f6960..cf762da 100644
--- a/.github/workflows/ci-integration-process.yaml
+++ b/.github/workflows/ci-integration-process.yaml
@@ -63,6 +63,14 @@ jobs:
         run: mvn -B -f docker/pulsar-all/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true
 
       - name: build artifacts and docker image
+        if: steps.docs.outputs.changed_only == 'no'
+        run: mvn -B -f docker/pulsar/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true
+
+      - name: build pulsar-all image
+        if: steps.docs.outputs.changed_only == 'no'
+        run: mvn -B -f docker/pulsar-all/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true
+
+      - name: build artifacts and docker image
         run: mvn -B install -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR -Pdocker -DskipTests
 
       - name: run integration tests
diff --git a/.github/workflows/ci-integration-thread.yaml b/.github/workflows/ci-integration-thread.yaml
index 79a702a..687b1b2 100644
--- a/.github/workflows/ci-integration-thread.yaml
+++ b/.github/workflows/ci-integration-thread.yaml
@@ -63,6 +63,14 @@ jobs:
         run: mvn -B -f docker/pulsar-all/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true
 
       - name: build artifacts and docker image
+        if: steps.docs.outputs.changed_only == 'no'
+        run: mvn -B -f docker/pulsar/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true
+
+      - name: build pulsar-all image
+        if: steps.docs.outputs.changed_only == 'no'
+        run: mvn -B -f docker/pulsar-all/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true
+
+      - name: build artifacts and docker image
         run: mvn -B install -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR -Pdocker -DskipTests
 
       - name: run integration tests
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 1598be7..ae98235 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -48,11 +48,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import javax.validation.constraints.AssertTrue;
 import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.Response.Status;
diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py b/pulsar-functions/instance/src/main/python/Function_pb2.py
index 409f749..108ed79 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -21,8 +21,6 @@
 # Generated by the protocol buffer compiler.  DO NOT EDIT!
 # source: Function.proto
 
-import sys
-_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
 from google.protobuf.internal import enum_type_wrapper
 from google.protobuf import descriptor as _descriptor
 from google.protobuf import message as _message
@@ -39,8 +37,8 @@ DESCRIPTOR = _descriptor.FileDescriptor(
   name='Function.proto',
   package='proto',
   syntax='proto3',
-  serialized_options=_b('\n!org.apache.pulsar.functions.protoB\010Function'),
-  serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 \x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\x85\x05\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x1 [...]
+  serialized_options=b'\n!org.apache.pulsar.functions.protoB\010Function',
+  serialized_pb=b'\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 \x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\xa3\x05\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\ [...]
 )
 
 _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
@@ -64,8 +62,8 @@ _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   serialized_options=None,
-  serialized_start=2302,
-  serialized_end=2381,
+  serialized_start=2429,
+  serialized_end=2508,
 )
 _sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES)
 
@@ -87,12 +85,35 @@ _SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   serialized_options=None,
-  serialized_start=2383,
-  serialized_end=2427,
+  serialized_start=2510,
+  serialized_end=2554,
 )
 _sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE)
 
 SubscriptionType = enum_type_wrapper.EnumTypeWrapper(_SUBSCRIPTIONTYPE)
+_SUBSCRIPTIONPOSITION = _descriptor.EnumDescriptor(
+  name='SubscriptionPosition',
+  full_name='proto.SubscriptionPosition',
+  filename=None,
+  file=DESCRIPTOR,
+  values=[
+    _descriptor.EnumValueDescriptor(
+      name='LATEST', index=0, number=0,
+      serialized_options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='EARLIEST', index=1, number=1,
+      serialized_options=None,
+      type=None),
+  ],
+  containing_type=None,
+  serialized_options=None,
+  serialized_start=2556,
+  serialized_end=2604,
+)
+_sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONPOSITION)
+
+SubscriptionPosition = enum_type_wrapper.EnumTypeWrapper(_SUBSCRIPTIONPOSITION)
 _FUNCTIONSTATE = _descriptor.EnumDescriptor(
   name='FunctionState',
   full_name='proto.FunctionState',
@@ -110,8 +131,8 @@ _FUNCTIONSTATE = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   serialized_options=None,
-  serialized_start=2429,
-  serialized_end=2470,
+  serialized_start=2606,
+  serialized_end=2647,
 )
 _sym_db.RegisterEnumDescriptor(_FUNCTIONSTATE)
 
@@ -121,6 +142,8 @@ ATMOST_ONCE = 1
 EFFECTIVELY_ONCE = 2
 SHARED = 0
 FAILOVER = 1
+LATEST = 0
+EARLIEST = 1
 RUNNING = 0
 STOPPED = 1
 
@@ -146,8 +169,8 @@ _FUNCTIONDETAILS_RUNTIME = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   serialized_options=None,
-  serialized_start=687,
-  serialized_end=726,
+  serialized_start=717,
+  serialized_end=756,
 )
 _sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_RUNTIME)
 
@@ -176,8 +199,8 @@ _FUNCTIONDETAILS_COMPONENTTYPE = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   serialized_options=None,
-  serialized_start=728,
-  serialized_end=792,
+  serialized_start=758,
+  serialized_end=822,
 )
 _sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_COMPONENTTYPE)
 
@@ -244,7 +267,7 @@ _RETRYDETAILS = _descriptor.Descriptor(
     _descriptor.FieldDescriptor(
       name='deadLetterTopic', full_name='proto.RetryDetails.deadLetterTopic', index=1,
       number=2, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
@@ -275,35 +298,35 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
     _descriptor.FieldDescriptor(
       name='tenant', full_name='proto.FunctionDetails.tenant', index=0,
       number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='namespace', full_name='proto.FunctionDetails.namespace', index=1,
       number=2, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='name', full_name='proto.FunctionDetails.name', index=2,
       number=3, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='className', full_name='proto.FunctionDetails.className', index=3,
       number=4, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='logTopic', full_name='proto.FunctionDetails.logTopic', index=4,
       number=5, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
@@ -317,14 +340,14 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
     _descriptor.FieldDescriptor(
       name='userConfig', full_name='proto.FunctionDetails.userConfig', index=6,
       number=7, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='secretsMap', full_name='proto.FunctionDetails.secretsMap', index=7,
       number=16, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
@@ -373,7 +396,7 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
     _descriptor.FieldDescriptor(
       name='packageUrl', full_name='proto.FunctionDetails.packageUrl', index=14,
       number=14, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
@@ -387,7 +410,7 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
     _descriptor.FieldDescriptor(
       name='runtimeFlags', full_name='proto.FunctionDetails.runtimeFlags', index=16,
       number=17, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
@@ -398,6 +421,13 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='customRuntimeOptions', full_name='proto.FunctionDetails.customRuntimeOptions', index=18,
+      number=19, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=b"".decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
@@ -413,7 +443,7 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
   oneofs=[
   ],
   serialized_start=147,
-  serialized_end=792,
+  serialized_end=822,
 )
 
 
@@ -443,8 +473,8 @@ _CONSUMERSPEC_RECEIVERQUEUESIZE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=945,
-  serialized_end=979,
+  serialized_start=975,
+  serialized_end=1009,
 )
 
 _CONSUMERSPEC = _descriptor.Descriptor(
@@ -457,14 +487,14 @@ _CONSUMERSPEC = _descriptor.Descriptor(
     _descriptor.FieldDescriptor(
       name='schemaType', full_name='proto.ConsumerSpec.schemaType', index=0,
       number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='serdeClassName', full_name='proto.ConsumerSpec.serdeClassName', index=1,
       number=2, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
@@ -494,8 +524,8 @@ _CONSUMERSPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=795,
-  serialized_end=979,
+  serialized_start=825,
+  serialized_end=1009,
 )
 
 
@@ -509,14 +539,14 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY = _descriptor.Descriptor(
     _descriptor.FieldDescriptor(
       name='key', full_name='proto.SourceSpec.TopicsToSerDeClassNameEntry.key', index=0,
       number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='value', full_name='proto.SourceSpec.TopicsToSerDeClassNameEntry.value', index=1,
       number=2, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
@@ -526,14 +556,14 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY = _descriptor.Descriptor(
   nested_types=[],
   enum_types=[
   ],
-  serialized_options=_b('8\001'),
+  serialized_options=b'8\001',
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1362,
-  serialized_end=1423,
+  serialized_start=1451,
+  serialized_end=1512,
 )
 
 _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
@@ -546,7 +576,7 @@ _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
     _descriptor.FieldDescriptor(
       name='key', full_name='proto.SourceSpec.InputSpecsEntry.key', index=0,
       number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
@@ -563,14 +593,14 @@ _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
   nested_types=[],
   enum_types=[
   ],
-  serialized_options=_b('8\001'),
+  serialized_options=b'8\001',
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1425,
-  serialized_end=1495,
+  serialized_start=1514,
+  serialized_end=1584,
 )
 
 _SOURCESPEC = _descriptor.Descriptor(
@@ -583,21 +613,21 @@ _SOURCESPEC = _descriptor.Descriptor(
     _descriptor.FieldDescriptor(
       name='className', full_name='proto.SourceSpec.className', index=0,
       number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='configs', full_name='proto.SourceSpec.configs', index=1,
       number=2, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='typeClassName', full_name='proto.SourceSpec.typeClassName', index=2,
       number=5, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
@@ -614,7 +644,7 @@ _SOURCESPEC = _descriptor.Descriptor(
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=_b('\030\001'), file=DESCRIPTOR),
+      serialized_options=b'\030\001', file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='inputSpecs', full_name='proto.SourceSpec.inputSpecs', index=5,
       number=10, type=11, cpp_type=10, label=3,
@@ -632,21 +662,21 @@ _SOURCESPEC = _descriptor.Descriptor(
     _descriptor.FieldDescriptor(
       name='topicsPattern', full_name='proto.SourceSpec.topicsPattern', index=7,
       number=7, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=_b('\030\001'), file=DESCRIPTOR),
+      serialized_options=b'\030\001', file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='builtin', full_name='proto.SourceSpec.builtin', index=8,
       number=8, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='subscriptionName', full_name='proto.SourceSpec.subscriptionName', index=9,
       number=9, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
@@ -657,6 +687,13 @@ _SOURCESPEC = _descriptor.Descriptor(
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='subscriptionPosition', full_name='proto.SourceSpec.subscriptionPosition', index=11,
+      number=12, type=14, cpp_type=8, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
@@ -669,8 +706,8 @@ _SOURCESPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=982,
-  serialized_end=1495,
+  serialized_start=1012,
+  serialized_end=1584,
 )
 
 
@@ -684,49 +721,56 @@ _SINKSPEC = _descriptor.Descriptor(
     _descriptor.FieldDescriptor(
       name='className', full_name='proto.SinkSpec.className', index=0,
       number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='configs', full_name='proto.SinkSpec.configs', index=1,
       number=2, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='typeClassName', full_name='proto.SinkSpec.typeClassName', index=2,
       number=5, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='topic', full_name='proto.SinkSpec.topic', index=3,
       number=3, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='serDeClassName', full_name='proto.SinkSpec.serDeClassName', index=4,
       number=4, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='builtin', full_name='proto.SinkSpec.builtin', index=5,
       number=6, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='schemaType', full_name='proto.SinkSpec.schemaType', index=6,
       number=7, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='forwardSourceMessageProperty', full_name='proto.SinkSpec.forwardSourceMessageProperty', index=7,
+      number=8, type=8, cpp_type=7, label=1,
+      has_default_value=False, default_value=False,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
@@ -742,8 +786,8 @@ _SINKSPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1498,
-  serialized_end=1643,
+  serialized_start=1587,
+  serialized_end=1770,
 )
 
 
@@ -757,14 +801,14 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor(
     _descriptor.FieldDescriptor(
       name='packagePath', full_name='proto.PackageLocationMetaData.packagePath', index=0,
       number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='originalFileName', full_name='proto.PackageLocationMetaData.originalFileName', index=1,
       number=2, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
@@ -780,8 +824,8 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1645,
-  serialized_end=1717,
+  serialized_start=1772,
+  serialized_end=1844,
 )
 
 
@@ -812,14 +856,14 @@ _FUNCTIONMETADATA_INSTANCESTATESENTRY = _descriptor.Descriptor(
   nested_types=[],
   enum_types=[
   ],
-  serialized_options=_b('8\001'),
+  serialized_options=b'8\001',
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=2013,
-  serialized_end=2088,
+  serialized_start=2140,
+  serialized_end=2215,
 )
 
 _FUNCTIONMETADATA = _descriptor.Descriptor(
@@ -883,8 +927,8 @@ _FUNCTIONMETADATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1720,
-  serialized_end=2088,
+  serialized_start=1847,
+  serialized_end=2215,
 )
 
 
@@ -898,14 +942,14 @@ _FUNCTIONAUTHENTICATIONSPEC = _descriptor.Descriptor(
     _descriptor.FieldDescriptor(
       name='data', full_name='proto.FunctionAuthenticationSpec.data', index=0,
       number=1, type=12, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b(""),
+      has_default_value=False, default_value=b"",
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='provider', full_name='proto.FunctionAuthenticationSpec.provider', index=1,
       number=2, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
@@ -921,8 +965,8 @@ _FUNCTIONAUTHENTICATIONSPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=2090,
-  serialized_end=2150,
+  serialized_start=2217,
+  serialized_end=2277,
 )
 
 
@@ -959,8 +1003,8 @@ _INSTANCE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=2152,
-  serialized_end=2233,
+  serialized_start=2279,
+  serialized_end=2360,
 )
 
 
@@ -981,7 +1025,7 @@ _ASSIGNMENT = _descriptor.Descriptor(
     _descriptor.FieldDescriptor(
       name='workerId', full_name='proto.Assignment.workerId', index=1,
       number=2, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
@@ -997,8 +1041,8 @@ _ASSIGNMENT = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=2235,
-  serialized_end=2300,
+  serialized_start=2362,
+  serialized_end=2427,
 )
 
 _FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = _PROCESSINGGUARANTEES
@@ -1018,6 +1062,7 @@ _SOURCESPEC_INPUTSPECSENTRY.containing_type = _SOURCESPEC
 _SOURCESPEC.fields_by_name['subscriptionType'].enum_type = _SUBSCRIPTIONTYPE
 _SOURCESPEC.fields_by_name['topicsToSerDeClassName'].message_type = _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY
 _SOURCESPEC.fields_by_name['inputSpecs'].message_type = _SOURCESPEC_INPUTSPECSENTRY
+_SOURCESPEC.fields_by_name['subscriptionPosition'].enum_type = _SUBSCRIPTIONPOSITION
 _FUNCTIONMETADATA_INSTANCESTATESENTRY.fields_by_name['value'].enum_type = _FUNCTIONSTATE
 _FUNCTIONMETADATA_INSTANCESTATESENTRY.containing_type = _FUNCTIONMETADATA
 _FUNCTIONMETADATA.fields_by_name['functionDetails'].message_type = _FUNCTIONDETAILS
@@ -1039,116 +1084,117 @@ DESCRIPTOR.message_types_by_name['Instance'] = _INSTANCE
 DESCRIPTOR.message_types_by_name['Assignment'] = _ASSIGNMENT
 DESCRIPTOR.enum_types_by_name['ProcessingGuarantees'] = _PROCESSINGGUARANTEES
 DESCRIPTOR.enum_types_by_name['SubscriptionType'] = _SUBSCRIPTIONTYPE
+DESCRIPTOR.enum_types_by_name['SubscriptionPosition'] = _SUBSCRIPTIONPOSITION
 DESCRIPTOR.enum_types_by_name['FunctionState'] = _FUNCTIONSTATE
 _sym_db.RegisterFileDescriptor(DESCRIPTOR)
 
-Resources = _reflection.GeneratedProtocolMessageType('Resources', (_message.Message,), dict(
-  DESCRIPTOR = _RESOURCES,
-  __module__ = 'Function_pb2'
+Resources = _reflection.GeneratedProtocolMessageType('Resources', (_message.Message,), {
+  'DESCRIPTOR' : _RESOURCES,
+  '__module__' : 'Function_pb2'
   # @@protoc_insertion_point(class_scope:proto.Resources)
-  ))
+  })
 _sym_db.RegisterMessage(Resources)
 
-RetryDetails = _reflection.GeneratedProtocolMessageType('RetryDetails', (_message.Message,), dict(
-  DESCRIPTOR = _RETRYDETAILS,
-  __module__ = 'Function_pb2'
+RetryDetails = _reflection.GeneratedProtocolMessageType('RetryDetails', (_message.Message,), {
+  'DESCRIPTOR' : _RETRYDETAILS,
+  '__module__' : 'Function_pb2'
   # @@protoc_insertion_point(class_scope:proto.RetryDetails)
-  ))
+  })
 _sym_db.RegisterMessage(RetryDetails)
 
-FunctionDetails = _reflection.GeneratedProtocolMessageType('FunctionDetails', (_message.Message,), dict(
-  DESCRIPTOR = _FUNCTIONDETAILS,
-  __module__ = 'Function_pb2'
+FunctionDetails = _reflection.GeneratedProtocolMessageType('FunctionDetails', (_message.Message,), {
+  'DESCRIPTOR' : _FUNCTIONDETAILS,
+  '__module__' : 'Function_pb2'
   # @@protoc_insertion_point(class_scope:proto.FunctionDetails)
-  ))
+  })
 _sym_db.RegisterMessage(FunctionDetails)
 
-ConsumerSpec = _reflection.GeneratedProtocolMessageType('ConsumerSpec', (_message.Message,), dict(
+ConsumerSpec = _reflection.GeneratedProtocolMessageType('ConsumerSpec', (_message.Message,), {
 
-  ReceiverQueueSize = _reflection.GeneratedProtocolMessageType('ReceiverQueueSize', (_message.Message,), dict(
-    DESCRIPTOR = _CONSUMERSPEC_RECEIVERQUEUESIZE,
-    __module__ = 'Function_pb2'
+  'ReceiverQueueSize' : _reflection.GeneratedProtocolMessageType('ReceiverQueueSize', (_message.Message,), {
+    'DESCRIPTOR' : _CONSUMERSPEC_RECEIVERQUEUESIZE,
+    '__module__' : 'Function_pb2'
     # @@protoc_insertion_point(class_scope:proto.ConsumerSpec.ReceiverQueueSize)
-    ))
+    })
   ,
-  DESCRIPTOR = _CONSUMERSPEC,
-  __module__ = 'Function_pb2'
+  'DESCRIPTOR' : _CONSUMERSPEC,
+  '__module__' : 'Function_pb2'
   # @@protoc_insertion_point(class_scope:proto.ConsumerSpec)
-  ))
+  })
 _sym_db.RegisterMessage(ConsumerSpec)
 _sym_db.RegisterMessage(ConsumerSpec.ReceiverQueueSize)
 
-SourceSpec = _reflection.GeneratedProtocolMessageType('SourceSpec', (_message.Message,), dict(
+SourceSpec = _reflection.GeneratedProtocolMessageType('SourceSpec', (_message.Message,), {
 
-  TopicsToSerDeClassNameEntry = _reflection.GeneratedProtocolMessageType('TopicsToSerDeClassNameEntry', (_message.Message,), dict(
-    DESCRIPTOR = _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY,
-    __module__ = 'Function_pb2'
+  'TopicsToSerDeClassNameEntry' : _reflection.GeneratedProtocolMessageType('TopicsToSerDeClassNameEntry', (_message.Message,), {
+    'DESCRIPTOR' : _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY,
+    '__module__' : 'Function_pb2'
     # @@protoc_insertion_point(class_scope:proto.SourceSpec.TopicsToSerDeClassNameEntry)
-    ))
+    })
   ,
 
-  InputSpecsEntry = _reflection.GeneratedProtocolMessageType('InputSpecsEntry', (_message.Message,), dict(
-    DESCRIPTOR = _SOURCESPEC_INPUTSPECSENTRY,
-    __module__ = 'Function_pb2'
+  'InputSpecsEntry' : _reflection.GeneratedProtocolMessageType('InputSpecsEntry', (_message.Message,), {
+    'DESCRIPTOR' : _SOURCESPEC_INPUTSPECSENTRY,
+    '__module__' : 'Function_pb2'
     # @@protoc_insertion_point(class_scope:proto.SourceSpec.InputSpecsEntry)
-    ))
+    })
   ,
-  DESCRIPTOR = _SOURCESPEC,
-  __module__ = 'Function_pb2'
+  'DESCRIPTOR' : _SOURCESPEC,
+  '__module__' : 'Function_pb2'
   # @@protoc_insertion_point(class_scope:proto.SourceSpec)
-  ))
+  })
 _sym_db.RegisterMessage(SourceSpec)
 _sym_db.RegisterMessage(SourceSpec.TopicsToSerDeClassNameEntry)
 _sym_db.RegisterMessage(SourceSpec.InputSpecsEntry)
 
-SinkSpec = _reflection.GeneratedProtocolMessageType('SinkSpec', (_message.Message,), dict(
-  DESCRIPTOR = _SINKSPEC,
-  __module__ = 'Function_pb2'
+SinkSpec = _reflection.GeneratedProtocolMessageType('SinkSpec', (_message.Message,), {
+  'DESCRIPTOR' : _SINKSPEC,
+  '__module__' : 'Function_pb2'
   # @@protoc_insertion_point(class_scope:proto.SinkSpec)
-  ))
+  })
 _sym_db.RegisterMessage(SinkSpec)
 
-PackageLocationMetaData = _reflection.GeneratedProtocolMessageType('PackageLocationMetaData', (_message.Message,), dict(
-  DESCRIPTOR = _PACKAGELOCATIONMETADATA,
-  __module__ = 'Function_pb2'
+PackageLocationMetaData = _reflection.GeneratedProtocolMessageType('PackageLocationMetaData', (_message.Message,), {
+  'DESCRIPTOR' : _PACKAGELOCATIONMETADATA,
+  '__module__' : 'Function_pb2'
   # @@protoc_insertion_point(class_scope:proto.PackageLocationMetaData)
-  ))
+  })
 _sym_db.RegisterMessage(PackageLocationMetaData)
 
-FunctionMetaData = _reflection.GeneratedProtocolMessageType('FunctionMetaData', (_message.Message,), dict(
+FunctionMetaData = _reflection.GeneratedProtocolMessageType('FunctionMetaData', (_message.Message,), {
 
-  InstanceStatesEntry = _reflection.GeneratedProtocolMessageType('InstanceStatesEntry', (_message.Message,), dict(
-    DESCRIPTOR = _FUNCTIONMETADATA_INSTANCESTATESENTRY,
-    __module__ = 'Function_pb2'
+  'InstanceStatesEntry' : _reflection.GeneratedProtocolMessageType('InstanceStatesEntry', (_message.Message,), {
+    'DESCRIPTOR' : _FUNCTIONMETADATA_INSTANCESTATESENTRY,
+    '__module__' : 'Function_pb2'
     # @@protoc_insertion_point(class_scope:proto.FunctionMetaData.InstanceStatesEntry)
-    ))
+    })
   ,
-  DESCRIPTOR = _FUNCTIONMETADATA,
-  __module__ = 'Function_pb2'
+  'DESCRIPTOR' : _FUNCTIONMETADATA,
+  '__module__' : 'Function_pb2'
   # @@protoc_insertion_point(class_scope:proto.FunctionMetaData)
-  ))
+  })
 _sym_db.RegisterMessage(FunctionMetaData)
 _sym_db.RegisterMessage(FunctionMetaData.InstanceStatesEntry)
 
-FunctionAuthenticationSpec = _reflection.GeneratedProtocolMessageType('FunctionAuthenticationSpec', (_message.Message,), dict(
-  DESCRIPTOR = _FUNCTIONAUTHENTICATIONSPEC,
-  __module__ = 'Function_pb2'
+FunctionAuthenticationSpec = _reflection.GeneratedProtocolMessageType('FunctionAuthenticationSpec', (_message.Message,), {
+  'DESCRIPTOR' : _FUNCTIONAUTHENTICATIONSPEC,
+  '__module__' : 'Function_pb2'
   # @@protoc_insertion_point(class_scope:proto.FunctionAuthenticationSpec)
-  ))
+  })
 _sym_db.RegisterMessage(FunctionAuthenticationSpec)
 
-Instance = _reflection.GeneratedProtocolMessageType('Instance', (_message.Message,), dict(
-  DESCRIPTOR = _INSTANCE,
-  __module__ = 'Function_pb2'
+Instance = _reflection.GeneratedProtocolMessageType('Instance', (_message.Message,), {
+  'DESCRIPTOR' : _INSTANCE,
+  '__module__' : 'Function_pb2'
   # @@protoc_insertion_point(class_scope:proto.Instance)
-  ))
+  })
 _sym_db.RegisterMessage(Instance)
 
-Assignment = _reflection.GeneratedProtocolMessageType('Assignment', (_message.Message,), dict(
-  DESCRIPTOR = _ASSIGNMENT,
-  __module__ = 'Function_pb2'
+Assignment = _reflection.GeneratedProtocolMessageType('Assignment', (_message.Message,), {
+  'DESCRIPTOR' : _ASSIGNMENT,
+  '__module__' : 'Function_pb2'
   # @@protoc_insertion_point(class_scope:proto.Assignment)
-  ))
+  })
 _sym_db.RegisterMessage(Assignment)