You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Antony Mayi <an...@yahoo.com> on 2017/04/10 20:27:40 UTC

howto zip pcollection with index

Hi,
trying to implement a transformer PTransform<PCollection<T>, PCollection<KV<Long, T>>> where the output keys would be generated element indexes (increasing, unique). What's the best way to implement this before the beam stateful DoFn is available?
thx,a.

Re: howto zip pcollection with index

Posted by Robert Bradshaw <ro...@google.com>.
You can get uniqueness by inserting a group-by-key on a set of
arbitrary, and then using this key + position in the value list to
create a unique index, e.g.

indexed_pcollection = (
    input_pcollection
    | beam.Map(lambda x: (random.randrange(N), x))
    | beam.GroupByKey()
    | beam.FlatMap(lambda (k, xs): ((k+N*ix, x) for ix, x in enumerate(xs))))

This will be perfectly dense if N=1, but processed sequentially.
Increasing N will increase parallelism at the expense of lower density
(though still pretty dense most of the time).

Note that even if using stateful DoFns one would need to introduce
keys (and, implicitly, a group-by-key).

On Mon, Apr 10, 2017 at 1:36 PM, Ben Chambers <bc...@google.com> wrote:
> There isn't currently a great of doing this, since in general, it would
> require single-threaded processing. Further, PCollections don't really have
> a concept of order.
>
> Could you explain more about your use case? Why do you need to zip elements
> with their index?
>
>
> On Mon, Apr 10, 2017 at 1:28 PM Antony Mayi <an...@yahoo.com> wrote:
>>
>> Hi,
>>
>> trying to implement a transformer PTransform<PCollection<T>,
>> PCollection<KV<Long, T>>> where the output keys would be generated element
>> indexes (increasing, unique). What's the best way to implement this before
>> the beam stateful DoFn is available?
>>
>> thx,
>> a.

Re: howto zip pcollection with index

Posted by Ben Chambers <bc...@google.com>.
There isn't currently a great of doing this, since in general, it would
require single-threaded processing. Further, PCollections don't really have
a concept of order.

Could you explain more about your use case? Why do you need to zip elements
with their index?


On Mon, Apr 10, 2017 at 1:28 PM Antony Mayi <an...@yahoo.com> wrote:

> Hi,
>
> trying to implement a transformer PTransform<PCollection<T>,
> PCollection<KV<Long, T>>> where the output keys would be generated element
> indexes (increasing, unique). What's the best way to implement this before
> the beam stateful DoFn is available?
>
> thx,
> a.
>

Re: howto zip pcollection with index

Posted by Lukasz Cwik <lc...@google.com>.
If the PCollection<T> is small you can just convert it into a
PCollectionView<List<T>> using View.asList and then in another ParDo read
in this list as a side input and iterate over all the elements using the
index offset in the list.

To parallelize the above, you need to break up the List into ranges which
execute on independent machines:
PCollection<T> --> View.asList --> PCollectionView<List<T>>
Create.of("dummy element") -- read in the PCollectionView<List<T>> and
partition the space into somewhere between 1,000 and 1,000,000 ranges -->
PCollection<Range>
PCollection<Range> --> Reshuffle --> PCollection<Range> ("break fusion
barrier so that execution of ranges is done on multiple machines")
PCollection<Range> -- read in the PCollectionView<List<T>> and iterate over
the specified range --> PCollection<Long, T>

Dataflow uses the following trick to create the PCollectionView<List<T>>
and assigns each element an index. Not all runners may support large views
which is why I explain the logic below just in case you use a runner that
doesn't support large side inputs:
PCollection<T> --- Shard the values based upon some deterministic property
of T ---> PCollection<KV<ShardKeyOfT, T>>
PCollection<KV<ShardKeyOfT, T>> --- Count.perKey --->
PCollection<KV<ShardKeyOfT, Long>>
PCollection<KV<ShardKeyOfT, Long>> --- View.asMap --->
PCollectionView<Map<ShardKeyOfT, Long>>

The PCollectionView<Map<ShardKeyOfT, Long>> contains a map containing how
many values with the shard key of T there are. If you order the shard keys
of T and add them up, you can produce global offsets.
For example, lets say T is a non empty byte array and you use the first
byte as the shard key then the map will contain things like
0 --> 405
1 --> 57
2 --> 38
...
255 --> 87
which can be used to produce a global index offset per shard key map:
0 ---> 0
1 ---> 0+405
2 ---> 0+405+57

Once you have this global index offset map, you can just group and iterate
over the elements:
PCollection<KV<ShardKeyOfT, T>> --- GroupByKey --->
PCollection<KV<ShardKeyOfT, Iterable<T>>  --- Assign incremental id using
global index offset ---> PCollection<KV<Long, T>>

You use this side input when processing PCollection<KV<ShardKeyOfT,
Iterable<T>>> and based upon your deterministic shard key of T, you can
count from the global offset in the map and just add 1 for each value in
the Iterable<T> that you see.

Note that this is parallelizable only to the number of shard keys that
there are so choosing some reasonable number like more than 1000 and less
than one million. Choosing too many will make the lookup map too large and
ideally you want to fit it into memory completely. The sharding
distribution is important as well and you want it to be as even as possible.










On Mon, Apr 10, 2017 at 1:34 PM, Sourabh Bajaj <so...@google.com>
wrote:

> Do you want the keys to be unique for a particular value of T or across
> the entire collection.
>
> I don't think the latter is possible but someone else can correct me.
>
> For the first it is similar to a FlatMap so you can just do it within the
> process function of your DoFn.
>
> On Mon, Apr 10, 2017 at 1:28 PM Antony Mayi <an...@yahoo.com> wrote:
>
>> Hi,
>>
>> trying to implement a transformer PTransform<PCollection<T>,
>> PCollection<KV<Long, T>>> where the output keys would be generated element
>> indexes (increasing, unique). What's the best way to implement this before
>> the beam stateful DoFn is available?
>>
>> thx,
>> a.
>>
>

Re: howto zip pcollection with index

Posted by Sourabh Bajaj <so...@google.com>.
Do you want the keys to be unique for a particular value of T or across the
entire collection.

I don't think the latter is possible but someone else can correct me.

For the first it is similar to a FlatMap so you can just do it within the
process function of your DoFn.

On Mon, Apr 10, 2017 at 1:28 PM Antony Mayi <an...@yahoo.com> wrote:

> Hi,
>
> trying to implement a transformer PTransform<PCollection<T>,
> PCollection<KV<Long, T>>> where the output keys would be generated element
> indexes (increasing, unique). What's the best way to implement this before
> the beam stateful DoFn is available?
>
> thx,
> a.
>