You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Lien Michiels <li...@froomle.com> on 2020/06/08 11:15:22 UTC

BEAM-2217 NotImplementedError - DataflowRunner parsing Protos from PubSub (Python SDK)

Hi everyone,

First time writing the email list, so please tell me if I'm doing this all
wrong.

I'm building a streaming pipeline to be run on the DataflowRunner that
reads from PubSub and writes to BQ using the Python 3 SDK.

I can get the pipeline started fine with the DirectRunner, but as soon as I
try to deploy to DataFlow it throws the following error:

File "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py",
line 221, in to_type_hint
    raise NotImplementedError('BEAM-2717')

I've tried narrowing down what exactly could be causing the issue and it
appears to be caused by the second step in my pipeline, which transforms
the bytes read from PubSub to my own internal Proto format:

def parse_message_blobs(x: bytes) -> ActionWrapper:
action_wrapper = ActionWrapper()
action_wrapper.ParseFromString(x)

return action_wrapper

which is applied as a Map step.

I've added typehints to all downstream steps as follows:
def partition_by_environment(
x: ActionWrapper, num_partitions: int, environments: List[str]
) -> int:

I'd really appreciate it if anyone could let me know what I'm doing wrong,
or what exactly is the issue this error is referring to. I read the JIRA
ticket, but did not understand how it is related to my issue here.

Thanks!
Kind regards,
Lien

Re: BEAM-2217 NotImplementedError - DataflowRunner parsing Protos from PubSub (Python SDK)

Posted by Lien Michiels <li...@froomle.com>.
Hi Udi, Brian,

Thanks so much 🙌  I'm now able to deploy the pipeline.


Lien


On Thu, 11 Jun 2020 at 00:02, Udi Meiri <eh...@google.com> wrote:

> I don't believe you need to register a coder unless you create your own
> coder class.
>
> Since the coder has a proto_message_type field, it could return that in
> to_type_hint, but I'll defer to Robert since I'm unfamiliar with that area.
>
> As a temporary workaround, please try overriding the output type like so:
>   Map(parse_message_blobs).with_output_types(typing.Any)
>
> That should prevent Beam from attempting to use the ProtoCoder.
>
> On Wed, Jun 10, 2020 at 1:47 PM Brian Hulette <bh...@google.com> wrote:
>
>> +Udi Meiri <eh...@google.com> - do you have any suggestions to resolve
>> this?
>>
>> It looks like there are a couple things going wrong:
>> - ProtoCoder doesn't have a definition for to_type_hint
>> - DataflowRunner calls from_runner_api, which I believe is a workaround
>> we can eventually remove (+Robert Bradshaw <ro...@google.com>), which
>> in turn tries to get type hints for every coder in the pipeline
>>
>> Brian
>>
>> On Tue, Jun 9, 2020 at 1:50 AM Lien Michiels <li...@froomle.com>
>> wrote:
>>
>>> Hi Brian,
>>>
>>> Thanks so much for your quick response!
>>>
>>> I've tried with both Apache Beam 2.20.0 and 2.21.0, both result in the
>>> exact same error. Here is the full stacktrace:
>>>
>>> (metadata-persistor) ➜  metadata-persistor git:(feature/DEV-1249) ✗
>>> metadata_persistor --project XXXXX --environments XXXXX --window_size 1
>>> --input_subscription
>>> projects/XXXXXXX/subscriptions/mds-internal-item-metadata-subscription
>>> --runner DataflowRunner --temp_location gs://XXXXXXXX/item-metadata/temp
>>> --staging_location gs://XXXXXXXXXXX/item-metadata/staging
>>> /Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py:1431:
>>> BeamDeprecationWarning: options is deprecated since First stable release.
>>> References to <pipeline>.options will not be supported
>>>   experiments = p.options.view_as(DebugOptions).experiments or []
>>> WARNING:root:Make sure that locally built Python SDK docker image has
>>> Python 3.7 interpreter.
>>> Traceback (most recent call last):
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/bin/metadata_persistor",
>>> line 8, in <module>
>>>     sys.exit(run())
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/metadata_persistor/item_metadata_mds_persistor.py",
>>> line 246, in run
>>>     batch_size=500,
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 524, in __exit__
>>>     self.run().wait_until_finish()
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 497, in run
>>>     self._options).run(False)
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 510, in run
>>>     return self.runner.run_pipeline(self, self._options)
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py",
>>> line 484, in run_pipeline
>>>     allow_proto_holders=True)
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 858, in from_runner_api
>>>     p.transforms_stack =
>>> [context.transforms.get_by_id(root_transform_id)]
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>> line 103, in get_by_id
>>>     self._id_to_proto[id], self._pipeline_context)
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 1238, in from_runner_api
>>>     part = context.transforms.get_by_id(transform_id)
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>> line 103, in get_by_id
>>>     self._id_to_proto[id], self._pipeline_context)
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 1244, in from_runner_api
>>>     id in proto.outputs.items()
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 1244, in <dictcomp>
>>>     id in proto.outputs.items()
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>> line 103, in get_by_id
>>>     self._id_to_proto[id], self._pipeline_context)
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pvalue.py",
>>> line 214, in from_runner_api
>>>     element_type=context.element_type_from_coder_id(proto.coder_id),
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>> line 227, in element_type_from_coder_id
>>>     self.coders[coder_id].to_type_hint())
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/coders/coders.py",
>>> line 221, in to_type_hint
>>>     raise NotImplementedError('BEAM-2717')
>>> NotImplementedError: BEAM-2717
>>>
>>> When I was debugging and commenting out the different steps, I noticed
>>> the location in my code that supposedly throws the error changes. Here it
>>> complains about the WriteToBigQuery step (batch_size=500) but if I comment
>>> out that step it just moves on to the one above. It appears it's
>>> consistently thrown on the last run step (don't know if that's helpful,
>>> just thought I'd mention it).
>>>
>>> After adding beam.typehints.disable_type_annotations() it still throws
>>> the same error.
>>>
>>> Another thing I forgot to mention in my first email is that I registered
>>> a ProtoCoder as suggested at the bottom of this page (
>>> https://beam.apache.org/documentation/sdks/python-type-safety/) as:
>>>
>>> beam.coders.registry.register_coder(ActionWrapper, ProtoCoder)
>>>
>>> Thanks again, really appreciate your help!
>>> Lien
>>>
>>> On Mon, Jun 8, 2020 at 5:26 PM Brian Hulette <bh...@google.com>
>>> wrote:
>>>
>>>> Hi Lien,
>>>>
>>>> > First time writing the email list, so please tell me if I'm doing
>>>> this all wrong.
>>>> Not at all! This is exactly the kind of question this list is for
>>>>
>>>> I have a couple of questions that may help us debug:
>>>> - Can you share the full stacktrace?
>>>> - What version of Beam are you using?
>>>>
>>>> There were some changes to the way we use typehints in the most recent
>>>> Beam release (2.21) that might be causing this [1]. If you're using 2.21
>>>> could you try reverting to the old behavior (call
>>>> `apache_beam.typehints.disable_type_annotations()` before constructing the
>>>> pipeline) to see if that helps?
>>>>
>>>> Thanks,
>>>> Brian
>>>>
>>>> [1] https://beam.apache.org/blog/python-typing/
>>>>
>>>> On Mon, Jun 8, 2020 at 4:15 AM Lien Michiels <li...@froomle.com>
>>>> wrote:
>>>>
>>>>>
>>>>> Hi everyone,
>>>>>
>>>>> First time writing the email list, so please tell me if I'm doing this
>>>>> all wrong.
>>>>>
>>>>> I'm building a streaming pipeline to be run on the DataflowRunner that
>>>>> reads from PubSub and writes to BQ using the Python 3 SDK.
>>>>>
>>>>> I can get the pipeline started fine with the DirectRunner, but as soon
>>>>> as I try to deploy to DataFlow it throws the following error:
>>>>>
>>>>> File
>>>>> "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py", line
>>>>> 221, in to_type_hint
>>>>>     raise NotImplementedError('BEAM-2717')
>>>>>
>>>>> I've tried narrowing down what exactly could be causing the issue and
>>>>> it appears to be caused by the second step in my pipeline, which transforms
>>>>> the bytes read from PubSub to my own internal Proto format:
>>>>>
>>>>> def parse_message_blobs(x: bytes) -> ActionWrapper:
>>>>> action_wrapper = ActionWrapper()
>>>>> action_wrapper.ParseFromString(x)
>>>>>
>>>>> return action_wrapper
>>>>>
>>>>> which is applied as a Map step.
>>>>>
>>>>> I've added typehints to all downstream steps as follows:
>>>>> def partition_by_environment(
>>>>> x: ActionWrapper, num_partitions: int, environments: List[str]
>>>>> ) -> int:
>>>>>
>>>>> I'd really appreciate it if anyone could let me know what I'm doing
>>>>> wrong, or what exactly is the issue this error is referring to. I read the
>>>>> JIRA ticket, but did not understand how it is related to my issue here.
>>>>>
>>>>> Thanks!
>>>>> Kind regards,
>>>>> Lien
>>>>>
>>>> --

Lien Michiels
Data Scientist & Solutions Architect
FROOMLE
m: +32 483 71 87 36
w: froomle.ai  e: lien.michiels@froomle.com
<https://www.linkedin.com/in/lien-michiels/>

Re: BEAM-2217 NotImplementedError - DataflowRunner parsing Protos from PubSub (Python SDK)

Posted by Udi Meiri <eh...@google.com>.
I don't believe you need to register a coder unless you create your own
coder class.

Since the coder has a proto_message_type field, it could return that in
to_type_hint, but I'll defer to Robert since I'm unfamiliar with that area.

As a temporary workaround, please try overriding the output type like so:
  Map(parse_message_blobs).with_output_types(typing.Any)

That should prevent Beam from attempting to use the ProtoCoder.

On Wed, Jun 10, 2020 at 1:47 PM Brian Hulette <bh...@google.com> wrote:

> +Udi Meiri <eh...@google.com> - do you have any suggestions to resolve
> this?
>
> It looks like there are a couple things going wrong:
> - ProtoCoder doesn't have a definition for to_type_hint
> - DataflowRunner calls from_runner_api, which I believe is a workaround we
> can eventually remove (+Robert Bradshaw <ro...@google.com>), which in
> turn tries to get type hints for every coder in the pipeline
>
> Brian
>
> On Tue, Jun 9, 2020 at 1:50 AM Lien Michiels <li...@froomle.com>
> wrote:
>
>> Hi Brian,
>>
>> Thanks so much for your quick response!
>>
>> I've tried with both Apache Beam 2.20.0 and 2.21.0, both result in the
>> exact same error. Here is the full stacktrace:
>>
>> (metadata-persistor) ➜  metadata-persistor git:(feature/DEV-1249) ✗
>> metadata_persistor --project XXXXX --environments XXXXX --window_size 1
>> --input_subscription
>> projects/XXXXXXX/subscriptions/mds-internal-item-metadata-subscription
>> --runner DataflowRunner --temp_location gs://XXXXXXXX/item-metadata/temp
>> --staging_location gs://XXXXXXXXXXX/item-metadata/staging
>> /Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py:1431:
>> BeamDeprecationWarning: options is deprecated since First stable release.
>> References to <pipeline>.options will not be supported
>>   experiments = p.options.view_as(DebugOptions).experiments or []
>> WARNING:root:Make sure that locally built Python SDK docker image has
>> Python 3.7 interpreter.
>> Traceback (most recent call last):
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/bin/metadata_persistor",
>> line 8, in <module>
>>     sys.exit(run())
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/metadata_persistor/item_metadata_mds_persistor.py",
>> line 246, in run
>>     batch_size=500,
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 524, in __exit__
>>     self.run().wait_until_finish()
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 497, in run
>>     self._options).run(False)
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 510, in run
>>     return self.runner.run_pipeline(self, self._options)
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py",
>> line 484, in run_pipeline
>>     allow_proto_holders=True)
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 858, in from_runner_api
>>     p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 103, in get_by_id
>>     self._id_to_proto[id], self._pipeline_context)
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 1238, in from_runner_api
>>     part = context.transforms.get_by_id(transform_id)
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 103, in get_by_id
>>     self._id_to_proto[id], self._pipeline_context)
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 1244, in from_runner_api
>>     id in proto.outputs.items()
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 1244, in <dictcomp>
>>     id in proto.outputs.items()
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 103, in get_by_id
>>     self._id_to_proto[id], self._pipeline_context)
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pvalue.py",
>> line 214, in from_runner_api
>>     element_type=context.element_type_from_coder_id(proto.coder_id),
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 227, in element_type_from_coder_id
>>     self.coders[coder_id].to_type_hint())
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/coders/coders.py",
>> line 221, in to_type_hint
>>     raise NotImplementedError('BEAM-2717')
>> NotImplementedError: BEAM-2717
>>
>> When I was debugging and commenting out the different steps, I noticed
>> the location in my code that supposedly throws the error changes. Here it
>> complains about the WriteToBigQuery step (batch_size=500) but if I comment
>> out that step it just moves on to the one above. It appears it's
>> consistently thrown on the last run step (don't know if that's helpful,
>> just thought I'd mention it).
>>
>> After adding beam.typehints.disable_type_annotations() it still throws
>> the same error.
>>
>> Another thing I forgot to mention in my first email is that I registered
>> a ProtoCoder as suggested at the bottom of this page (
>> https://beam.apache.org/documentation/sdks/python-type-safety/) as:
>>
>> beam.coders.registry.register_coder(ActionWrapper, ProtoCoder)
>>
>> Thanks again, really appreciate your help!
>> Lien
>>
>> On Mon, Jun 8, 2020 at 5:26 PM Brian Hulette <bh...@google.com> wrote:
>>
>>> Hi Lien,
>>>
>>> > First time writing the email list, so please tell me if I'm doing
>>> this all wrong.
>>> Not at all! This is exactly the kind of question this list is for
>>>
>>> I have a couple of questions that may help us debug:
>>> - Can you share the full stacktrace?
>>> - What version of Beam are you using?
>>>
>>> There were some changes to the way we use typehints in the most recent
>>> Beam release (2.21) that might be causing this [1]. If you're using 2.21
>>> could you try reverting to the old behavior (call
>>> `apache_beam.typehints.disable_type_annotations()` before constructing the
>>> pipeline) to see if that helps?
>>>
>>> Thanks,
>>> Brian
>>>
>>> [1] https://beam.apache.org/blog/python-typing/
>>>
>>> On Mon, Jun 8, 2020 at 4:15 AM Lien Michiels <li...@froomle.com>
>>> wrote:
>>>
>>>>
>>>> Hi everyone,
>>>>
>>>> First time writing the email list, so please tell me if I'm doing this
>>>> all wrong.
>>>>
>>>> I'm building a streaming pipeline to be run on the DataflowRunner that
>>>> reads from PubSub and writes to BQ using the Python 3 SDK.
>>>>
>>>> I can get the pipeline started fine with the DirectRunner, but as soon
>>>> as I try to deploy to DataFlow it throws the following error:
>>>>
>>>> File
>>>> "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py", line
>>>> 221, in to_type_hint
>>>>     raise NotImplementedError('BEAM-2717')
>>>>
>>>> I've tried narrowing down what exactly could be causing the issue and
>>>> it appears to be caused by the second step in my pipeline, which transforms
>>>> the bytes read from PubSub to my own internal Proto format:
>>>>
>>>> def parse_message_blobs(x: bytes) -> ActionWrapper:
>>>> action_wrapper = ActionWrapper()
>>>> action_wrapper.ParseFromString(x)
>>>>
>>>> return action_wrapper
>>>>
>>>> which is applied as a Map step.
>>>>
>>>> I've added typehints to all downstream steps as follows:
>>>> def partition_by_environment(
>>>> x: ActionWrapper, num_partitions: int, environments: List[str]
>>>> ) -> int:
>>>>
>>>> I'd really appreciate it if anyone could let me know what I'm doing
>>>> wrong, or what exactly is the issue this error is referring to. I read the
>>>> JIRA ticket, but did not understand how it is related to my issue here.
>>>>
>>>> Thanks!
>>>> Kind regards,
>>>> Lien
>>>>
>>>

Re: BEAM-2217 NotImplementedError - DataflowRunner parsing Protos from PubSub (Python SDK)

Posted by Brian Hulette <bh...@google.com>.
+Udi Meiri <eh...@google.com> - do you have any suggestions to resolve this?

It looks like there are a couple things going wrong:
- ProtoCoder doesn't have a definition for to_type_hint
- DataflowRunner calls from_runner_api, which I believe is a workaround we
can eventually remove (+Robert Bradshaw <ro...@google.com>), which in
turn tries to get type hints for every coder in the pipeline

Brian

On Tue, Jun 9, 2020 at 1:50 AM Lien Michiels <li...@froomle.com>
wrote:

> Hi Brian,
>
> Thanks so much for your quick response!
>
> I've tried with both Apache Beam 2.20.0 and 2.21.0, both result in the
> exact same error. Here is the full stacktrace:
>
> (metadata-persistor) ➜  metadata-persistor git:(feature/DEV-1249) ✗
> metadata_persistor --project XXXXX --environments XXXXX --window_size 1
> --input_subscription
> projects/XXXXXXX/subscriptions/mds-internal-item-metadata-subscription
> --runner DataflowRunner --temp_location gs://XXXXXXXX/item-metadata/temp
> --staging_location gs://XXXXXXXXXXX/item-metadata/staging
> /Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py:1431:
> BeamDeprecationWarning: options is deprecated since First stable release.
> References to <pipeline>.options will not be supported
>   experiments = p.options.view_as(DebugOptions).experiments or []
> WARNING:root:Make sure that locally built Python SDK docker image has
> Python 3.7 interpreter.
> Traceback (most recent call last):
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/bin/metadata_persistor",
> line 8, in <module>
>     sys.exit(run())
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/metadata_persistor/item_metadata_mds_persistor.py",
> line 246, in run
>     batch_size=500,
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 524, in __exit__
>     self.run().wait_until_finish()
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 497, in run
>     self._options).run(False)
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 510, in run
>     return self.runner.run_pipeline(self, self._options)
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py",
> line 484, in run_pipeline
>     allow_proto_holders=True)
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 858, in from_runner_api
>     p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 103, in get_by_id
>     self._id_to_proto[id], self._pipeline_context)
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 1238, in from_runner_api
>     part = context.transforms.get_by_id(transform_id)
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 103, in get_by_id
>     self._id_to_proto[id], self._pipeline_context)
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 1244, in from_runner_api
>     id in proto.outputs.items()
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 1244, in <dictcomp>
>     id in proto.outputs.items()
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 103, in get_by_id
>     self._id_to_proto[id], self._pipeline_context)
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pvalue.py",
> line 214, in from_runner_api
>     element_type=context.element_type_from_coder_id(proto.coder_id),
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 227, in element_type_from_coder_id
>     self.coders[coder_id].to_type_hint())
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/coders/coders.py",
> line 221, in to_type_hint
>     raise NotImplementedError('BEAM-2717')
> NotImplementedError: BEAM-2717
>
> When I was debugging and commenting out the different steps, I noticed the
> location in my code that supposedly throws the error changes. Here it
> complains about the WriteToBigQuery step (batch_size=500) but if I comment
> out that step it just moves on to the one above. It appears it's
> consistently thrown on the last run step (don't know if that's helpful,
> just thought I'd mention it).
>
> After adding beam.typehints.disable_type_annotations() it still throws
> the same error.
>
> Another thing I forgot to mention in my first email is that I registered a
> ProtoCoder as suggested at the bottom of this page (
> https://beam.apache.org/documentation/sdks/python-type-safety/) as:
>
> beam.coders.registry.register_coder(ActionWrapper, ProtoCoder)
>
> Thanks again, really appreciate your help!
> Lien
>
> On Mon, Jun 8, 2020 at 5:26 PM Brian Hulette <bh...@google.com> wrote:
>
>> Hi Lien,
>>
>> > First time writing the email list, so please tell me if I'm doing this
>> all wrong.
>> Not at all! This is exactly the kind of question this list is for
>>
>> I have a couple of questions that may help us debug:
>> - Can you share the full stacktrace?
>> - What version of Beam are you using?
>>
>> There were some changes to the way we use typehints in the most recent
>> Beam release (2.21) that might be causing this [1]. If you're using 2.21
>> could you try reverting to the old behavior (call
>> `apache_beam.typehints.disable_type_annotations()` before constructing the
>> pipeline) to see if that helps?
>>
>> Thanks,
>> Brian
>>
>> [1] https://beam.apache.org/blog/python-typing/
>>
>> On Mon, Jun 8, 2020 at 4:15 AM Lien Michiels <li...@froomle.com>
>> wrote:
>>
>>>
>>> Hi everyone,
>>>
>>> First time writing the email list, so please tell me if I'm doing this
>>> all wrong.
>>>
>>> I'm building a streaming pipeline to be run on the DataflowRunner that
>>> reads from PubSub and writes to BQ using the Python 3 SDK.
>>>
>>> I can get the pipeline started fine with the DirectRunner, but as soon
>>> as I try to deploy to DataFlow it throws the following error:
>>>
>>> File
>>> "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py", line
>>> 221, in to_type_hint
>>>     raise NotImplementedError('BEAM-2717')
>>>
>>> I've tried narrowing down what exactly could be causing the issue and it
>>> appears to be caused by the second step in my pipeline, which transforms
>>> the bytes read from PubSub to my own internal Proto format:
>>>
>>> def parse_message_blobs(x: bytes) -> ActionWrapper:
>>> action_wrapper = ActionWrapper()
>>> action_wrapper.ParseFromString(x)
>>>
>>> return action_wrapper
>>>
>>> which is applied as a Map step.
>>>
>>> I've added typehints to all downstream steps as follows:
>>> def partition_by_environment(
>>> x: ActionWrapper, num_partitions: int, environments: List[str]
>>> ) -> int:
>>>
>>> I'd really appreciate it if anyone could let me know what I'm doing
>>> wrong, or what exactly is the issue this error is referring to. I read the
>>> JIRA ticket, but did not understand how it is related to my issue here.
>>>
>>> Thanks!
>>> Kind regards,
>>> Lien
>>>
>>

Re: BEAM-2217 NotImplementedError - DataflowRunner parsing Protos from PubSub (Python SDK)

Posted by Lien Michiels <li...@froomle.com>.
Hi Brian,

Thanks so much for your quick response!

I've tried with both Apache Beam 2.20.0 and 2.21.0, both result in the
exact same error. Here is the full stacktrace:

(metadata-persistor) ➜  metadata-persistor git:(feature/DEV-1249) ✗
metadata_persistor --project XXXXX --environments XXXXX --window_size 1
--input_subscription
projects/XXXXXXX/subscriptions/mds-internal-item-metadata-subscription
--runner DataflowRunner --temp_location gs://XXXXXXXX/item-metadata/temp
--staging_location gs://XXXXXXXXXXX/item-metadata/staging
/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py:1431:
BeamDeprecationWarning: options is deprecated since First stable release.
References to <pipeline>.options will not be supported
  experiments = p.options.view_as(DebugOptions).experiments or []
WARNING:root:Make sure that locally built Python SDK docker image has
Python 3.7 interpreter.
Traceback (most recent call last):
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/bin/metadata_persistor",
line 8, in <module>
    sys.exit(run())
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/metadata_persistor/item_metadata_mds_persistor.py",
line 246, in run
    batch_size=500,
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 524, in __exit__
    self.run().wait_until_finish()
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 497, in run
    self._options).run(False)
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 510, in run
    return self.runner.run_pipeline(self, self._options)
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py",
line 484, in run_pipeline
    allow_proto_holders=True)
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 858, in from_runner_api
    p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
line 103, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 1238, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
line 103, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 1244, in from_runner_api
    id in proto.outputs.items()
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 1244, in <dictcomp>
    id in proto.outputs.items()
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
line 103, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pvalue.py",
line 214, in from_runner_api
    element_type=context.element_type_from_coder_id(proto.coder_id),
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
line 227, in element_type_from_coder_id
    self.coders[coder_id].to_type_hint())
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/coders/coders.py",
line 221, in to_type_hint
    raise NotImplementedError('BEAM-2717')
NotImplementedError: BEAM-2717

When I was debugging and commenting out the different steps, I noticed the
location in my code that supposedly throws the error changes. Here it
complains about the WriteToBigQuery step (batch_size=500) but if I comment
out that step it just moves on to the one above. It appears it's
consistently thrown on the last run step (don't know if that's helpful,
just thought I'd mention it).

After adding beam.typehints.disable_type_annotations() it still throws the
same error.

Another thing I forgot to mention in my first email is that I registered a
ProtoCoder as suggested at the bottom of this page (
https://beam.apache.org/documentation/sdks/python-type-safety/) as:

beam.coders.registry.register_coder(ActionWrapper, ProtoCoder)

Thanks again, really appreciate your help!
Lien

On Mon, Jun 8, 2020 at 5:26 PM Brian Hulette <bh...@google.com> wrote:

> Hi Lien,
>
> > First time writing the email list, so please tell me if I'm doing this
> all wrong.
> Not at all! This is exactly the kind of question this list is for
>
> I have a couple of questions that may help us debug:
> - Can you share the full stacktrace?
> - What version of Beam are you using?
>
> There were some changes to the way we use typehints in the most recent
> Beam release (2.21) that might be causing this [1]. If you're using 2.21
> could you try reverting to the old behavior (call
> `apache_beam.typehints.disable_type_annotations()` before constructing the
> pipeline) to see if that helps?
>
> Thanks,
> Brian
>
> [1] https://beam.apache.org/blog/python-typing/
>
> On Mon, Jun 8, 2020 at 4:15 AM Lien Michiels <li...@froomle.com>
> wrote:
>
>>
>> Hi everyone,
>>
>> First time writing the email list, so please tell me if I'm doing this
>> all wrong.
>>
>> I'm building a streaming pipeline to be run on the DataflowRunner that
>> reads from PubSub and writes to BQ using the Python 3 SDK.
>>
>> I can get the pipeline started fine with the DirectRunner, but as soon as
>> I try to deploy to DataFlow it throws the following error:
>>
>> File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py", line
>> 221, in to_type_hint
>>     raise NotImplementedError('BEAM-2717')
>>
>> I've tried narrowing down what exactly could be causing the issue and it
>> appears to be caused by the second step in my pipeline, which transforms
>> the bytes read from PubSub to my own internal Proto format:
>>
>> def parse_message_blobs(x: bytes) -> ActionWrapper:
>> action_wrapper = ActionWrapper()
>> action_wrapper.ParseFromString(x)
>>
>> return action_wrapper
>>
>> which is applied as a Map step.
>>
>> I've added typehints to all downstream steps as follows:
>> def partition_by_environment(
>> x: ActionWrapper, num_partitions: int, environments: List[str]
>> ) -> int:
>>
>> I'd really appreciate it if anyone could let me know what I'm doing
>> wrong, or what exactly is the issue this error is referring to. I read the
>> JIRA ticket, but did not understand how it is related to my issue here.
>>
>> Thanks!
>> Kind regards,
>> Lien
>>
>

Re: BEAM-2217 NotImplementedError - DataflowRunner parsing Protos from PubSub (Python SDK)

Posted by Brian Hulette <bh...@google.com>.
Hi Lien,

> First time writing the email list, so please tell me if I'm doing this
all wrong.
Not at all! This is exactly the kind of question this list is for

I have a couple of questions that may help us debug:
- Can you share the full stacktrace?
- What version of Beam are you using?

There were some changes to the way we use typehints in the most recent Beam
release (2.21) that might be causing this [1]. If you're using 2.21 could
you try reverting to the old behavior (call
`apache_beam.typehints.disable_type_annotations()` before constructing the
pipeline) to see if that helps?

Thanks,
Brian

[1] https://beam.apache.org/blog/python-typing/

On Mon, Jun 8, 2020 at 4:15 AM Lien Michiels <li...@froomle.com>
wrote:

>
> Hi everyone,
>
> First time writing the email list, so please tell me if I'm doing this all
> wrong.
>
> I'm building a streaming pipeline to be run on the DataflowRunner that
> reads from PubSub and writes to BQ using the Python 3 SDK.
>
> I can get the pipeline started fine with the DirectRunner, but as soon as
> I try to deploy to DataFlow it throws the following error:
>
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py", line
> 221, in to_type_hint
>     raise NotImplementedError('BEAM-2717')
>
> I've tried narrowing down what exactly could be causing the issue and it
> appears to be caused by the second step in my pipeline, which transforms
> the bytes read from PubSub to my own internal Proto format:
>
> def parse_message_blobs(x: bytes) -> ActionWrapper:
> action_wrapper = ActionWrapper()
> action_wrapper.ParseFromString(x)
>
> return action_wrapper
>
> which is applied as a Map step.
>
> I've added typehints to all downstream steps as follows:
> def partition_by_environment(
> x: ActionWrapper, num_partitions: int, environments: List[str]
> ) -> int:
>
> I'd really appreciate it if anyone could let me know what I'm doing wrong,
> or what exactly is the issue this error is referring to. I read the JIRA
> ticket, but did not understand how it is related to my issue here.
>
> Thanks!
> Kind regards,
> Lien
>