You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Damian Akpan <da...@gmail.com> on 2022/07/14 14:38:59 UTC

Implementing a custom I/O Connector

Hi Everyone,

I've been working on implementing a Google Sheets IO source for my
pipeline. I've tried this example
<https://beam.apache.org/documentation/programming-guide/#splittable-dofns>
along
with this blog <https://beam.apache.org/blog/splittable-do-fn/>. I have an
example here on colab
<https://colab.research.google.com/drive/1Fbq2jDqGcfJv7639tb_7-FkbRkQPCPxg?usp=sharing>.
It returns "AttributeError: 'PBegin' object has no attribute 'windowing' "

Please, what could I be doing wrong?

Re: Implementing a custom I/O Connector

Posted by Damian Akpan <da...@gmail.com>.
Okay, beam.Impulse() does solve it now

Thanks so much for your help.

On Thu, Jul 14, 2022 at 10:59 PM Chamikara Jayalath <ch...@google.com>
wrote:

> You cannot directly apply 'beam.ParDo' on the pipeline object.
> Instead you feed the source description element to the ParDo, for example,
> p | beam.Create([source_description]) |  beam.ParDo(CountFn(10))
>
> If the 'source_description' element is trivial (or gets ignored in the
> source), you can replace 'beam.Create([source_description])' with
> 'beam.Impulse()'.
>
> Thanks,
> Cham
>
>
> On Thu, Jul 14, 2022 at 2:45 PM Damian Akpan <da...@gmail.com>
> wrote:
>
>> It has this error
>>
>> After some looking around, I think the problem was because I treated the
>> Splittable DoFn as a regular DoFn. And they weren't any PCollection in the
>> pipeline.
>>
>>
>>> ---------------------------------------------------------------------------
>>> AttributeError                            Traceback (most recent call
>>> last)
>>> <ipython-input-4-980dbb4e11c7> in <module>()
>>>       3         p
>>>       4         | "ProduceNumbers" >> beam.ParDo(CountFn(10))
>>> ----> 5       | "Print" >> beam.Map(print)
>>>       6     )
>>
>>
>>> 14 frames
>>> /usr/local/lib/python3.7/dist-packages/apache_beam/transforms/ptransform.py
>>> in get_windowing(self, inputs)
>>>     560     """
>>>     561     if inputs:
>>> --> 562       return inputs[0].windowing
>>>     563     else:
>>>     564       from apache_beam.transforms.core import Windowing
>>
>> AttributeError: 'PBegin' object has no attribute 'windowing'
>>
>>
>> NB
>> - I did some editing on the notebook so the original revision is here
>> <https://colab.research.google.com/drive/1ljtoEtyG0gwbq6SPTY1EHHmpdh6EHuxu#scrollTo=vaXnHuVOtEdG>
>>
>> On Thu, Jul 14, 2022 at 10:15 PM Chamikara Jayalath via user <
>> user@beam.apache.org> wrote:
>>
>>> Do you have the full stacktrace ?
>>> Also, what does the Read() transform in the example entail ?
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Thu, Jul 14, 2022 at 7:39 AM Damian Akpan <da...@gmail.com>
>>> wrote:
>>>
>>>> Hi Everyone,
>>>>
>>>> I've been working on implementing a Google Sheets IO source for my
>>>> pipeline. I've tried this example
>>>> <https://beam.apache.org/documentation/programming-guide/#splittable-dofns> along
>>>> with this blog <https://beam.apache.org/blog/splittable-do-fn/>. I
>>>> have an example here on colab
>>>> <https://colab.research.google.com/drive/1Fbq2jDqGcfJv7639tb_7-FkbRkQPCPxg?usp=sharing>.
>>>> It returns "AttributeError: 'PBegin' object has no attribute 'windowing' "
>>>>
>>>> Please, what could I be doing wrong?
>>>>
>>>

Re: Implementing a custom I/O Connector

Posted by Chamikara Jayalath via user <us...@beam.apache.org>.
You cannot directly apply 'beam.ParDo' on the pipeline object.
Instead you feed the source description element to the ParDo, for example,
p | beam.Create([source_description]) |  beam.ParDo(CountFn(10))

If the 'source_description' element is trivial (or gets ignored in the
source), you can replace 'beam.Create([source_description])' with
'beam.Impulse()'.

Thanks,
Cham


On Thu, Jul 14, 2022 at 2:45 PM Damian Akpan <da...@gmail.com>
wrote:

> It has this error
>
> After some looking around, I think the problem was because I treated the
> Splittable DoFn as a regular DoFn. And they weren't any PCollection in the
> pipeline.
>
> ---------------------------------------------------------------------------
>> AttributeError                            Traceback (most recent call
>> last)
>> <ipython-input-4-980dbb4e11c7> in <module>()
>>       3         p
>>       4         | "ProduceNumbers" >> beam.ParDo(CountFn(10))
>> ----> 5       | "Print" >> beam.Map(print)
>>       6     )
>
>
>> 14 frames
>> /usr/local/lib/python3.7/dist-packages/apache_beam/transforms/ptransform.py
>> in get_windowing(self, inputs)
>>     560     """
>>     561     if inputs:
>> --> 562       return inputs[0].windowing
>>     563     else:
>>     564       from apache_beam.transforms.core import Windowing
>
> AttributeError: 'PBegin' object has no attribute 'windowing'
>
>
> NB
> - I did some editing on the notebook so the original revision is here
> <https://colab.research.google.com/drive/1ljtoEtyG0gwbq6SPTY1EHHmpdh6EHuxu#scrollTo=vaXnHuVOtEdG>
>
> On Thu, Jul 14, 2022 at 10:15 PM Chamikara Jayalath via user <
> user@beam.apache.org> wrote:
>
>> Do you have the full stacktrace ?
>> Also, what does the Read() transform in the example entail ?
>>
>> Thanks,
>> Cham
>>
>> On Thu, Jul 14, 2022 at 7:39 AM Damian Akpan <da...@gmail.com>
>> wrote:
>>
>>> Hi Everyone,
>>>
>>> I've been working on implementing a Google Sheets IO source for my
>>> pipeline. I've tried this example
>>> <https://beam.apache.org/documentation/programming-guide/#splittable-dofns> along
>>> with this blog <https://beam.apache.org/blog/splittable-do-fn/>. I have
>>> an example here on colab
>>> <https://colab.research.google.com/drive/1Fbq2jDqGcfJv7639tb_7-FkbRkQPCPxg?usp=sharing>.
>>> It returns "AttributeError: 'PBegin' object has no attribute 'windowing' "
>>>
>>> Please, what could I be doing wrong?
>>>
>>

Re: Implementing a custom I/O Connector

Posted by Damian Akpan <da...@gmail.com>.
It has this error

After some looking around, I think the problem was because I treated the
Splittable DoFn as a regular DoFn. And they weren't any PCollection in the
pipeline.

---------------------------------------------------------------------------
> AttributeError                            Traceback (most recent call last)
> <ipython-input-4-980dbb4e11c7> in <module>()
>       3         p
>       4         | "ProduceNumbers" >> beam.ParDo(CountFn(10))
> ----> 5       | "Print" >> beam.Map(print)
>       6     )


> 14 frames
> /usr/local/lib/python3.7/dist-packages/apache_beam/transforms/ptransform.py
> in get_windowing(self, inputs)
>     560     """
>     561     if inputs:
> --> 562       return inputs[0].windowing
>     563     else:
>     564       from apache_beam.transforms.core import Windowing

AttributeError: 'PBegin' object has no attribute 'windowing'


NB
- I did some editing on the notebook so the original revision is here
<https://colab.research.google.com/drive/1ljtoEtyG0gwbq6SPTY1EHHmpdh6EHuxu#scrollTo=vaXnHuVOtEdG>

On Thu, Jul 14, 2022 at 10:15 PM Chamikara Jayalath via user <
user@beam.apache.org> wrote:

> Do you have the full stacktrace ?
> Also, what does the Read() transform in the example entail ?
>
> Thanks,
> Cham
>
> On Thu, Jul 14, 2022 at 7:39 AM Damian Akpan <da...@gmail.com>
> wrote:
>
>> Hi Everyone,
>>
>> I've been working on implementing a Google Sheets IO source for my
>> pipeline. I've tried this example
>> <https://beam.apache.org/documentation/programming-guide/#splittable-dofns> along
>> with this blog <https://beam.apache.org/blog/splittable-do-fn/>. I have
>> an example here on colab
>> <https://colab.research.google.com/drive/1Fbq2jDqGcfJv7639tb_7-FkbRkQPCPxg?usp=sharing>.
>> It returns "AttributeError: 'PBegin' object has no attribute 'windowing' "
>>
>> Please, what could I be doing wrong?
>>
>

Re: Implementing a custom I/O Connector

Posted by Chamikara Jayalath via user <us...@beam.apache.org>.
Do you have the full stacktrace ?
Also, what does the Read() transform in the example entail ?

Thanks,
Cham

On Thu, Jul 14, 2022 at 7:39 AM Damian Akpan <da...@gmail.com>
wrote:

> Hi Everyone,
>
> I've been working on implementing a Google Sheets IO source for my
> pipeline. I've tried this example
> <https://beam.apache.org/documentation/programming-guide/#splittable-dofns> along
> with this blog <https://beam.apache.org/blog/splittable-do-fn/>. I have
> an example here on colab
> <https://colab.research.google.com/drive/1Fbq2jDqGcfJv7639tb_7-FkbRkQPCPxg?usp=sharing>.
> It returns "AttributeError: 'PBegin' object has no attribute 'windowing' "
>
> Please, what could I be doing wrong?
>