You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Jozef Vilcek <jo...@gmail.com> on 2019/03/14 14:04:28 UTC

"Dynamic" read / expand of PCollection element by IOs

Hello,

I wanted to write a Beam code which expands incoming `PCollection<>`,
element wise, by use of existing IO components. Example could be to have a
`PCollection<ResourceId>` which will hold arbitrary paths to data and I
want to load them via `HadoopFormatIO.Read` which is of `PTransform<PBegin,
PCollection<KV<>>`.

Problem is, I do not know how or if it is possible at all.
1. I do not see a way how to element wise apply `PTransform<PBegin, ..>
(hence reuse some existing IOs). Is it poossible?

2. If I would want to write such logic custom, is it doable in Beam model?

Thanks,
Jozef

Re: "Dynamic" read / expand of PCollection element by IOs

Posted by Jozef Vilcek <jo...@gmail.com>.
So `PTransform` must be written in a way to involve underlying `Source[]`
based on incoming element and PBegin is not it.
Cool, make sense to me. I will give it a try. Thanks

On Thu, Mar 14, 2019 at 5:55 PM Eugene Kirpichov <ki...@google.com>
wrote:

> I would phrase it more optimistically :)
> While there is no way to generically apply a PTransform<PBegin, ...>
> elementwise, Beam does have a pattern / best practice for developing IO
> connectors that can be applied elementwise - it's called "readAll" and some
> IOs provide this, e.g. TextIO.readAll(), JdbcIO.readAll() etc. Implementing
> this does not necessarily require SDF; SDF is only necessary for streaming
> use cases, or to get liquid sharding in case the IO supports it. I think
> implementing a readAll() version of HadoopFormatIO would not be difficult -
> take a look at how the other ones are implemented and give it a shot!
>
> On Thu, Mar 14, 2019 at 9:24 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> 1)
>> Your looking for SplittableDoFn[1]. It is still in development and a
>> conversion of all the current IO connectors that exist today to be able to
>> consume a PCollection of resources is yet to come.
>> There is some limited usecases that exist already like FileIO.match[2]
>> and if these fit your usecase then great.
>>
>> 2) Yes, for yet to be supported usecases, people have just been using
>> ParDo and implement the "IO" logic themselves.
>>
>> 1: https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
>> 2:
>> https://github.com/apache/beam/blob/2ac5b764e3450798661a97f2b51f2d602feafb23/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L133
>>
>>
>> On Thu, Mar 14, 2019 at 7:04 AM Jozef Vilcek <jo...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I wanted to write a Beam code which expands incoming `PCollection<>`,
>>> element wise, by use of existing IO components. Example could be to have a
>>> `PCollection<ResourceId>` which will hold arbitrary paths to data and I
>>> want to load them via `HadoopFormatIO.Read` which is of `PTransform<PBegin,
>>> PCollection<KV<>>`.
>>>
>>> Problem is, I do not know how or if it is possible at all.
>>> 1. I do not see a way how to element wise apply `PTransform<PBegin, ..>
>>> (hence reuse some existing IOs). Is it poossible?
>>>
>>> 2. If I would want to write such logic custom, is it doable in Beam
>>> model?
>>>
>>> Thanks,
>>> Jozef
>>>
>>

Re: "Dynamic" read / expand of PCollection element by IOs

Posted by Eugene Kirpichov <ki...@google.com>.
I would phrase it more optimistically :)
While there is no way to generically apply a PTransform<PBegin, ...>
elementwise, Beam does have a pattern / best practice for developing IO
connectors that can be applied elementwise - it's called "readAll" and some
IOs provide this, e.g. TextIO.readAll(), JdbcIO.readAll() etc. Implementing
this does not necessarily require SDF; SDF is only necessary for streaming
use cases, or to get liquid sharding in case the IO supports it. I think
implementing a readAll() version of HadoopFormatIO would not be difficult -
take a look at how the other ones are implemented and give it a shot!

On Thu, Mar 14, 2019 at 9:24 AM Lukasz Cwik <lc...@google.com> wrote:

> 1)
> Your looking for SplittableDoFn[1]. It is still in development and a
> conversion of all the current IO connectors that exist today to be able to
> consume a PCollection of resources is yet to come.
> There is some limited usecases that exist already like FileIO.match[2] and
> if these fit your usecase then great.
>
> 2) Yes, for yet to be supported usecases, people have just been using
> ParDo and implement the "IO" logic themselves.
>
> 1: https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
> 2:
> https://github.com/apache/beam/blob/2ac5b764e3450798661a97f2b51f2d602feafb23/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L133
>
>
> On Thu, Mar 14, 2019 at 7:04 AM Jozef Vilcek <jo...@gmail.com>
> wrote:
>
>> Hello,
>>
>> I wanted to write a Beam code which expands incoming `PCollection<>`,
>> element wise, by use of existing IO components. Example could be to have a
>> `PCollection<ResourceId>` which will hold arbitrary paths to data and I
>> want to load them via `HadoopFormatIO.Read` which is of `PTransform<PBegin,
>> PCollection<KV<>>`.
>>
>> Problem is, I do not know how or if it is possible at all.
>> 1. I do not see a way how to element wise apply `PTransform<PBegin, ..>
>> (hence reuse some existing IOs). Is it poossible?
>>
>> 2. If I would want to write such logic custom, is it doable in Beam model?
>>
>> Thanks,
>> Jozef
>>
>

Re: "Dynamic" read / expand of PCollection element by IOs

Posted by Lukasz Cwik <lc...@google.com>.
1)
Your looking for SplittableDoFn[1]. It is still in development and a
conversion of all the current IO connectors that exist today to be able to
consume a PCollection of resources is yet to come.
There is some limited usecases that exist already like FileIO.match[2] and
if these fit your usecase then great.

2) Yes, for yet to be supported usecases, people have just been using ParDo
and implement the "IO" logic themselves.

1: https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
2:
https://github.com/apache/beam/blob/2ac5b764e3450798661a97f2b51f2d602feafb23/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L133


On Thu, Mar 14, 2019 at 7:04 AM Jozef Vilcek <jo...@gmail.com> wrote:

> Hello,
>
> I wanted to write a Beam code which expands incoming `PCollection<>`,
> element wise, by use of existing IO components. Example could be to have a
> `PCollection<ResourceId>` which will hold arbitrary paths to data and I
> want to load them via `HadoopFormatIO.Read` which is of `PTransform<PBegin,
> PCollection<KV<>>`.
>
> Problem is, I do not know how or if it is possible at all.
> 1. I do not see a way how to element wise apply `PTransform<PBegin, ..>
> (hence reuse some existing IOs). Is it poossible?
>
> 2. If I would want to write such logic custom, is it doable in Beam model?
>
> Thanks,
> Jozef
>