You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Josh <jo...@gmail.com> on 2017/05/24 08:05:39 UTC

How to partition a stream by key before writing with FileBasedSink?

Hi,

I am using a FileBasedSink (AvroIO.write) on an unbounded stream
(withWindowedWrites, hourly windows, numShards=4).

I would like to partition the stream by some key in the element, so that
all elements with the same key will get processed by the same shard writer,
and therefore written to the same file. Is there a way to do this? Note
that in my stream the number of keys is very large (most elements have a
unique key, while a few elements share a key).

Thanks,
Josh

Re: How to partition a stream by key before writing with FileBasedSink?

Posted by Lukasz Cwik <lc...@google.com>.
There is a lot caching that is done to minimize how much is read but for
fault tolerance reasons, writes are committed to an external store.
Internally in Dataflow we use state for a lot of operations to power
triggers so it scales well and I wouldn't worry too much about how much
state your dealing with until you notice it. This may or may not be true
for other runners though.

Unless you benchmark it or you provide more concrete numbers around
throughput (state reads and writes per second per key) I can't provide much
more details.

On Wed, Jun 7, 2017 at 11:14 AM, Josh <jo...@gmail.com> wrote:

> Hi Lukasz,
>
> Have just given this a go with the state API and stateful DoFn on the
> global window, as you suggested - it seems to work very well.
>
> I was just wondering how efficient it is when running on the Dataflow
> runner, if for example, several elements with the same key arrive within a
> few milliseconds of one another  e.g. (k1, v1), (k1, v2), (k1, v3) ... and
> in my stateful DoFn's processElement method I am reading and updating the
> state via state.read() and state.write(...). Is it reading and writing to
> an external store every time? Or is it doing all this in-memory? - I'm just
> wondering how it will scale for a larger volume stream.
>
> Thanks,
> Josh
>
>
>
> On Tue, Jun 6, 2017 at 11:18 PM, Josh <jo...@gmail.com> wrote:
>
>> Ok I see, thanks Lukasz. I will try this out tomorrow.
>>
>> Sorry for the confusing question!
>>
>> Josh
>>
>> On Tue, Jun 6, 2017 at 10:01 PM, Lukasz Cwik <lc...@google.com> wrote:
>>
>>> Based upon your descriptions, it seemed like you wanted limited
>>> parallelism because of an external dependency.
>>>
>>> Your best bet would be to use the global window combined with a
>>> StatefulDoFn. See this blog post (https://beam.apache.org/blog/
>>> 2017/02/13/stateful-processing.html) about the StatefulDoFn.
>>>
>>> You will not be able to use a different window function till after the
>>> StatefulDoFn otherwise a GroupByKey may schedule your work on a different
>>> machine since the windows for a key may differ.
>>>
>>> Source -> StatefulDoFn -> Window.into(my other window type)
>>>
>>> All our sources currently operate within the global window until a
>>> Window.into happens. So there is no need to do Source ->
>>> Window.into(GlobalWindow) -> StatefulDoFn -> Window.into(my other window
>>> type)
>>>
>>>
>>> On Tue, Jun 6, 2017 at 12:03 PM, <jo...@gmail.com> wrote:
>>>
>>>> Hmm ok, I don't quite get why what I want to do isn't supported in Beam
>>>> ... I don't actually have a limited parallelism requirement, I just want to
>>>> be able to partition my unbounded stream by a key determined from the
>>>> elements, so that any two elements with the same key will be routed to the
>>>> same worker. I want to do this because my DoFn keeps some in-memory cached
>>>> state for each key (which I was planning to store at either DoFn or JVM
>>>> level). Does this sound like a bad idea?
>>>>
>>>>
>>>> On 6 Jun 2017, at 19:14, Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>> Your right, the window acts as a secondary key within GroupByKey
>>>> (KeyA,Window1 != KeyA,Window2), which means that each of those two
>>>> composite keys can be scheduled to execute at the same time.
>>>>
>>>> At this point I think you should challenge your limited parallelism
>>>> requirement as you'll need to build something outside of Apache Beam to
>>>> provide these parallelization limits across windows (e.g. lock within the
>>>> same process when limiting yourself to a single machine, distributed lock
>>>> service when dealing with multiple machines).
>>>>
>>>> The backlog of data is either going to grow infinitely at the
>>>> GroupByKey or grow infinitely at the source if your pipeline can't keep up.
>>>> It is up to the Runner to be smart and not produce a giant backlog at the
>>>> GroupByKey since it knows how fast work is being completed (unfortunately I
>>>> don't know if any Runner is this smart yet to push the backlog up to the
>>>> source).
>>>>
>>>> On Tue, Jun 6, 2017 at 11:03 AM, Josh <jo...@gmail.com> wrote:
>>>>
>>>>> I see, thanks for the tips!
>>>>>
>>>>> Last question about this! How could this be adapted to work in a
>>>>> unbounded/streaming job? To work in an unbounded job, I need to put a
>>>>> Window.into with a trigger before GroupByKey.
>>>>> I guess this would mean that the "shard gets processed by a single
>>>>> thread in MyDofn" guarantee will only apply to messages within a single
>>>>> window, and would not apply across windows?
>>>>> If this is the case, is there a better solution? I would like to avoid
>>>>> buffering data in windows, and want the shard guarantee to apply across
>>>>> windows.
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Jun 6, 2017 at 5:42 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> Your code looks like what I was describing. My only comment would be
>>>>>> to use a deterministic hashing function which is stable across JVM versions
>>>>>> and JVM instances as it will help in making your pipeline consistent across
>>>>>> different runs/environments.
>>>>>>
>>>>>> Parallelizing across 8 instances instead of 4 would break the
>>>>>> contract around GroupByKey (since it didn't group all the elements for a
>>>>>> key correctly). Also, each element is the smallest unit of work and
>>>>>> specifically in your pipeline you have chosen to reduce all your elements
>>>>>> into 4 logical elements (each containing some proportion of your original
>>>>>> data).
>>>>>>
>>>>>> On Tue, Jun 6, 2017 at 9:37 AM, Josh <jo...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks for the reply, Lukasz.
>>>>>>>
>>>>>>>
>>>>>>> What I meant was that I want to shard my data by a "shard key", and
>>>>>>> be sure that any two elements with the same "shard key" are processed by
>>>>>>> the same thread on the same worker. (Or if that's not possible, by the same
>>>>>>> worker JVM with no thread guarantee would be good enough). It doesn't
>>>>>>> actually matter to me whether there's 1 or 4 or 100 DoFn instances
>>>>>>> processing the data.
>>>>>>>
>>>>>>>
>>>>>>> It sounds like what you suggested will work for this, with the
>>>>>>> downside of me needing to choose a number of shards/DoFns (e.g. 4).
>>>>>>>
>>>>>>> It seems a bit long and messy but am I right in thinking it would
>>>>>>> look like this? ...
>>>>>>>
>>>>>>>
>>>>>>> PCollection<MyElement> elements = ...;
>>>>>>>
>>>>>>> elements
>>>>>>>
>>>>>>> .apply(MapElements
>>>>>>>
>>>>>>> .into(TypeDescriptors.kvs(TypeDescriptors.integers(),
>>>>>>> TypeDescriptor.of(MyElement.class)))
>>>>>>>
>>>>>>> .via((MyElement e) -> KV.of(
>>>>>>>
>>>>>>> e.getKey().toString().hashCode() % 4, e)))
>>>>>>>
>>>>>>> .apply(GroupByKey.create())
>>>>>>>
>>>>>>> .apply(Partition.of(4,
>>>>>>>
>>>>>>> (Partition.PartitionFn<KV<Integer, Iterable<MyElement>>>) (kv, i)
>>>>>>> -> kv.getKey()))
>>>>>>>
>>>>>>> .apply(ParDo.of(new MyDofn()));
>>>>>>>
>>>>>>> // Where MyDofn must be changed to handle a KV<Integer,
>>>>>>> Iterable<MyElement>> as input instead of just a MyElement
>>>>>>>
>>>>>>>
>>>>>>> I was wondering is there a guarantee that the runner won't
>>>>>>> parallelise the final MyDofn across e.g. 8 instances instead of 4? If there
>>>>>>> are two input elements with the same key are they actually guaranteed to be
>>>>>>> processed on the same instance?
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Josh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jun 6, 2017 at 4:51 PM, Lukasz Cwik <lc...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I think this is what your asking for but your statement about 4
>>>>>>>> instances is unclear as to whether that is 4 copies of the same DoFn or 4
>>>>>>>> completely different DoFns. Also its unclear what you mean by
>>>>>>>> instance/thread, I'm assuming that you want at most 4 instances of a DoFn
>>>>>>>> each being processed by a single thread.
>>>>>>>>
>>>>>>>> This is a bad idea because you limit your parallelism but this is
>>>>>>>> similar to what the default file sharding logic does. In Apache Beam the
>>>>>>>> smallest unit of output for a GroupByKey is a single key+iterable pair. We
>>>>>>>> exploit this by assigning all our values to a fixed number of keys and then
>>>>>>>> performing a GroupByKey. This is the same trick that powers the file
>>>>>>>> sharding logic in AvroIO/TextIO/...
>>>>>>>>
>>>>>>>> Your pipeline would look like (fixed width font diagram):
>>>>>>>> your data      -> apply shard key       -> GroupByKey        ->
>>>>>>>> partition by key -> your dofn #1
>>>>>>>>
>>>>>>>>              \> your dofn #2
>>>>>>>>
>>>>>>>>              \> ...
>>>>>>>> a  / b / c / d -> 1,a / 2,b / 1,c / 2,d -> 1,[a,c] / 2,[b,d] -> ???
>>>>>>>>
>>>>>>>> This is not exactly the same as processing a single DoFn
>>>>>>>> instance/thread because it relies on the Runner to be able to schedule each
>>>>>>>> key to be processed on a different machine. For example a Runner may choose
>>>>>>>> to process value 1,[a,c] and 2,[b,d] sequentially on the same machine or
>>>>>>>> may choose to distribute them.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jun 6, 2017 at 8:13 AM, Josh <jo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hey Lukasz,
>>>>>>>>>
>>>>>>>>> I have a follow up question about this -
>>>>>>>>>
>>>>>>>>> What if I want to do something very similar, but instead of with 4
>>>>>>>>> instances of AvroIO following the partition transform, I want 4 instances
>>>>>>>>> of a DoFn that I've written. I want to ensure that each partition is
>>>>>>>>> processed by a single DoFn instance/thread. Is this possible with Beam?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Josh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, May 24, 2017 at 6:15 PM, Josh <jo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Ahh I see - Ok I'll try out this solution then. Thanks Lukasz!
>>>>>>>>>>
>>>>>>>>>> On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik <lc...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Google Cloud Dataflow won't override your setting. The dynamic
>>>>>>>>>>> sharding occurs if you don't explicitly set a numShard value.
>>>>>>>>>>>
>>>>>>>>>>> On Wed, May 24, 2017 at 9:14 AM, Josh <jo...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Lukasz,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for the example. That sounds like a nice solution -
>>>>>>>>>>>> I am running on Dataflow though, which dynamically sets
>>>>>>>>>>>> numShards - so if I set numShards to 1 on each of those AvroIO writers, I
>>>>>>>>>>>> can't be sure that Dataflow isn't going to override my setting right? I
>>>>>>>>>>>> guess this should work fine as long as I partition my stream into a large
>>>>>>>>>>>> enough number of partitions so that Dataflow won't override numShards.
>>>>>>>>>>>>
>>>>>>>>>>>> Josh
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <lc...@google.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Since your using a small number of shards, add a Partition
>>>>>>>>>>>>> transform which uses a deterministic hash of the key to choose one of 4
>>>>>>>>>>>>> partitions. Write each partition with a single shard.
>>>>>>>>>>>>>
>>>>>>>>>>>>> (Fixed width diagram below)
>>>>>>>>>>>>> Pipeline -> AvroIO(numShards = 4)
>>>>>>>>>>>>> Becomes:
>>>>>>>>>>>>> Pipeline -> Partition --> AvroIO(numShards = 1)
>>>>>>>>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>>>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>>>>>>>>                       \-> AvroIO(numShards = 1)
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, May 24, 2017 at 1:05 AM, Josh <jo...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I am using a FileBasedSink (AvroIO.write) on an unbounded
>>>>>>>>>>>>>> stream (withWindowedWrites, hourly windows, numShards=4).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I would like to partition the stream by some key in the
>>>>>>>>>>>>>> element, so that all elements with the same key will get processed by the
>>>>>>>>>>>>>> same shard writer, and therefore written to the same file. Is there a way
>>>>>>>>>>>>>> to do this? Note that in my stream the number of keys is very large (most
>>>>>>>>>>>>>> elements have a unique key, while a few elements share a key).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Josh
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: How to partition a stream by key before writing with FileBasedSink?

Posted by Josh <jo...@gmail.com>.
Hi Lukasz,

Have just given this a go with the state API and stateful DoFn on the
global window, as you suggested - it seems to work very well.

I was just wondering how efficient it is when running on the Dataflow
runner, if for example, several elements with the same key arrive within a
few milliseconds of one another  e.g. (k1, v1), (k1, v2), (k1, v3) ... and
in my stateful DoFn's processElement method I am reading and updating the
state via state.read() and state.write(...). Is it reading and writing to
an external store every time? Or is it doing all this in-memory? - I'm just
wondering how it will scale for a larger volume stream.

Thanks,
Josh



On Tue, Jun 6, 2017 at 11:18 PM, Josh <jo...@gmail.com> wrote:

> Ok I see, thanks Lukasz. I will try this out tomorrow.
>
> Sorry for the confusing question!
>
> Josh
>
> On Tue, Jun 6, 2017 at 10:01 PM, Lukasz Cwik <lc...@google.com> wrote:
>
>> Based upon your descriptions, it seemed like you wanted limited
>> parallelism because of an external dependency.
>>
>> Your best bet would be to use the global window combined with a
>> StatefulDoFn. See this blog post (https://beam.apache.org/blog/
>> 2017/02/13/stateful-processing.html) about the StatefulDoFn.
>>
>> You will not be able to use a different window function till after the
>> StatefulDoFn otherwise a GroupByKey may schedule your work on a different
>> machine since the windows for a key may differ.
>>
>> Source -> StatefulDoFn -> Window.into(my other window type)
>>
>> All our sources currently operate within the global window until a
>> Window.into happens. So there is no need to do Source ->
>> Window.into(GlobalWindow) -> StatefulDoFn -> Window.into(my other window
>> type)
>>
>>
>> On Tue, Jun 6, 2017 at 12:03 PM, <jo...@gmail.com> wrote:
>>
>>> Hmm ok, I don't quite get why what I want to do isn't supported in Beam
>>> ... I don't actually have a limited parallelism requirement, I just want to
>>> be able to partition my unbounded stream by a key determined from the
>>> elements, so that any two elements with the same key will be routed to the
>>> same worker. I want to do this because my DoFn keeps some in-memory cached
>>> state for each key (which I was planning to store at either DoFn or JVM
>>> level). Does this sound like a bad idea?
>>>
>>>
>>> On 6 Jun 2017, at 19:14, Lukasz Cwik <lc...@google.com> wrote:
>>>
>>> Your right, the window acts as a secondary key within GroupByKey
>>> (KeyA,Window1 != KeyA,Window2), which means that each of those two
>>> composite keys can be scheduled to execute at the same time.
>>>
>>> At this point I think you should challenge your limited parallelism
>>> requirement as you'll need to build something outside of Apache Beam to
>>> provide these parallelization limits across windows (e.g. lock within the
>>> same process when limiting yourself to a single machine, distributed lock
>>> service when dealing with multiple machines).
>>>
>>> The backlog of data is either going to grow infinitely at the GroupByKey
>>> or grow infinitely at the source if your pipeline can't keep up. It is up
>>> to the Runner to be smart and not produce a giant backlog at the GroupByKey
>>> since it knows how fast work is being completed (unfortunately I don't know
>>> if any Runner is this smart yet to push the backlog up to the source).
>>>
>>> On Tue, Jun 6, 2017 at 11:03 AM, Josh <jo...@gmail.com> wrote:
>>>
>>>> I see, thanks for the tips!
>>>>
>>>> Last question about this! How could this be adapted to work in a
>>>> unbounded/streaming job? To work in an unbounded job, I need to put a
>>>> Window.into with a trigger before GroupByKey.
>>>> I guess this would mean that the "shard gets processed by a single
>>>> thread in MyDofn" guarantee will only apply to messages within a single
>>>> window, and would not apply across windows?
>>>> If this is the case, is there a better solution? I would like to avoid
>>>> buffering data in windows, and want the shard guarantee to apply across
>>>> windows.
>>>>
>>>>
>>>>
>>>> On Tue, Jun 6, 2017 at 5:42 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> Your code looks like what I was describing. My only comment would be
>>>>> to use a deterministic hashing function which is stable across JVM versions
>>>>> and JVM instances as it will help in making your pipeline consistent across
>>>>> different runs/environments.
>>>>>
>>>>> Parallelizing across 8 instances instead of 4 would break the contract
>>>>> around GroupByKey (since it didn't group all the elements for a key
>>>>> correctly). Also, each element is the smallest unit of work and
>>>>> specifically in your pipeline you have chosen to reduce all your elements
>>>>> into 4 logical elements (each containing some proportion of your original
>>>>> data).
>>>>>
>>>>> On Tue, Jun 6, 2017 at 9:37 AM, Josh <jo...@gmail.com> wrote:
>>>>>
>>>>>> Thanks for the reply, Lukasz.
>>>>>>
>>>>>>
>>>>>> What I meant was that I want to shard my data by a "shard key", and
>>>>>> be sure that any two elements with the same "shard key" are processed by
>>>>>> the same thread on the same worker. (Or if that's not possible, by the same
>>>>>> worker JVM with no thread guarantee would be good enough). It doesn't
>>>>>> actually matter to me whether there's 1 or 4 or 100 DoFn instances
>>>>>> processing the data.
>>>>>>
>>>>>>
>>>>>> It sounds like what you suggested will work for this, with the
>>>>>> downside of me needing to choose a number of shards/DoFns (e.g. 4).
>>>>>>
>>>>>> It seems a bit long and messy but am I right in thinking it would
>>>>>> look like this? ...
>>>>>>
>>>>>>
>>>>>> PCollection<MyElement> elements = ...;
>>>>>>
>>>>>> elements
>>>>>>
>>>>>> .apply(MapElements
>>>>>>
>>>>>> .into(TypeDescriptors.kvs(TypeDescriptors.integers(),
>>>>>> TypeDescriptor.of(MyElement.class)))
>>>>>>
>>>>>> .via((MyElement e) -> KV.of(
>>>>>>
>>>>>> e.getKey().toString().hashCode() % 4, e)))
>>>>>>
>>>>>> .apply(GroupByKey.create())
>>>>>>
>>>>>> .apply(Partition.of(4,
>>>>>>
>>>>>> (Partition.PartitionFn<KV<Integer, Iterable<MyElement>>>) (kv, i) ->
>>>>>> kv.getKey()))
>>>>>>
>>>>>> .apply(ParDo.of(new MyDofn()));
>>>>>>
>>>>>> // Where MyDofn must be changed to handle a KV<Integer,
>>>>>> Iterable<MyElement>> as input instead of just a MyElement
>>>>>>
>>>>>>
>>>>>> I was wondering is there a guarantee that the runner won't
>>>>>> parallelise the final MyDofn across e.g. 8 instances instead of 4? If there
>>>>>> are two input elements with the same key are they actually guaranteed to be
>>>>>> processed on the same instance?
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Josh
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Jun 6, 2017 at 4:51 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> I think this is what your asking for but your statement about 4
>>>>>>> instances is unclear as to whether that is 4 copies of the same DoFn or 4
>>>>>>> completely different DoFns. Also its unclear what you mean by
>>>>>>> instance/thread, I'm assuming that you want at most 4 instances of a DoFn
>>>>>>> each being processed by a single thread.
>>>>>>>
>>>>>>> This is a bad idea because you limit your parallelism but this is
>>>>>>> similar to what the default file sharding logic does. In Apache Beam the
>>>>>>> smallest unit of output for a GroupByKey is a single key+iterable pair. We
>>>>>>> exploit this by assigning all our values to a fixed number of keys and then
>>>>>>> performing a GroupByKey. This is the same trick that powers the file
>>>>>>> sharding logic in AvroIO/TextIO/...
>>>>>>>
>>>>>>> Your pipeline would look like (fixed width font diagram):
>>>>>>> your data      -> apply shard key       -> GroupByKey        ->
>>>>>>> partition by key -> your dofn #1
>>>>>>>
>>>>>>>              \> your dofn #2
>>>>>>>
>>>>>>>              \> ...
>>>>>>> a  / b / c / d -> 1,a / 2,b / 1,c / 2,d -> 1,[a,c] / 2,[b,d] -> ???
>>>>>>>
>>>>>>> This is not exactly the same as processing a single DoFn
>>>>>>> instance/thread because it relies on the Runner to be able to schedule each
>>>>>>> key to be processed on a different machine. For example a Runner may choose
>>>>>>> to process value 1,[a,c] and 2,[b,d] sequentially on the same machine or
>>>>>>> may choose to distribute them.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jun 6, 2017 at 8:13 AM, Josh <jo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hey Lukasz,
>>>>>>>>
>>>>>>>> I have a follow up question about this -
>>>>>>>>
>>>>>>>> What if I want to do something very similar, but instead of with 4
>>>>>>>> instances of AvroIO following the partition transform, I want 4 instances
>>>>>>>> of a DoFn that I've written. I want to ensure that each partition is
>>>>>>>> processed by a single DoFn instance/thread. Is this possible with Beam?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Josh
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, May 24, 2017 at 6:15 PM, Josh <jo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Ahh I see - Ok I'll try out this solution then. Thanks Lukasz!
>>>>>>>>>
>>>>>>>>> On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik <lc...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Google Cloud Dataflow won't override your setting. The dynamic
>>>>>>>>>> sharding occurs if you don't explicitly set a numShard value.
>>>>>>>>>>
>>>>>>>>>> On Wed, May 24, 2017 at 9:14 AM, Josh <jo...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Lukasz,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the example. That sounds like a nice solution -
>>>>>>>>>>> I am running on Dataflow though, which dynamically sets
>>>>>>>>>>> numShards - so if I set numShards to 1 on each of those AvroIO writers, I
>>>>>>>>>>> can't be sure that Dataflow isn't going to override my setting right? I
>>>>>>>>>>> guess this should work fine as long as I partition my stream into a large
>>>>>>>>>>> enough number of partitions so that Dataflow won't override numShards.
>>>>>>>>>>>
>>>>>>>>>>> Josh
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <lc...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Since your using a small number of shards, add a Partition
>>>>>>>>>>>> transform which uses a deterministic hash of the key to choose one of 4
>>>>>>>>>>>> partitions. Write each partition with a single shard.
>>>>>>>>>>>>
>>>>>>>>>>>> (Fixed width diagram below)
>>>>>>>>>>>> Pipeline -> AvroIO(numShards = 4)
>>>>>>>>>>>> Becomes:
>>>>>>>>>>>> Pipeline -> Partition --> AvroIO(numShards = 1)
>>>>>>>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>>>>>>>                       \-> AvroIO(numShards = 1)
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, May 24, 2017 at 1:05 AM, Josh <jo...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I am using a FileBasedSink (AvroIO.write) on an unbounded
>>>>>>>>>>>>> stream (withWindowedWrites, hourly windows, numShards=4).
>>>>>>>>>>>>>
>>>>>>>>>>>>> I would like to partition the stream by some key in the
>>>>>>>>>>>>> element, so that all elements with the same key will get processed by the
>>>>>>>>>>>>> same shard writer, and therefore written to the same file. Is there a way
>>>>>>>>>>>>> to do this? Note that in my stream the number of keys is very large (most
>>>>>>>>>>>>> elements have a unique key, while a few elements share a key).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Josh
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: How to partition a stream by key before writing with FileBasedSink?

Posted by Josh <jo...@gmail.com>.
Ok I see, thanks Lukasz. I will try this out tomorrow.

Sorry for the confusing question!

Josh

On Tue, Jun 6, 2017 at 10:01 PM, Lukasz Cwik <lc...@google.com> wrote:

> Based upon your descriptions, it seemed like you wanted limited
> parallelism because of an external dependency.
>
> Your best bet would be to use the global window combined with a
> StatefulDoFn. See this blog post (https://beam.apache.org/blog/
> 2017/02/13/stateful-processing.html) about the StatefulDoFn.
>
> You will not be able to use a different window function till after the
> StatefulDoFn otherwise a GroupByKey may schedule your work on a different
> machine since the windows for a key may differ.
>
> Source -> StatefulDoFn -> Window.into(my other window type)
>
> All our sources currently operate within the global window until a
> Window.into happens. So there is no need to do Source ->
> Window.into(GlobalWindow) -> StatefulDoFn -> Window.into(my other window
> type)
>
>
> On Tue, Jun 6, 2017 at 12:03 PM, <jo...@gmail.com> wrote:
>
>> Hmm ok, I don't quite get why what I want to do isn't supported in Beam
>> ... I don't actually have a limited parallelism requirement, I just want to
>> be able to partition my unbounded stream by a key determined from the
>> elements, so that any two elements with the same key will be routed to the
>> same worker. I want to do this because my DoFn keeps some in-memory cached
>> state for each key (which I was planning to store at either DoFn or JVM
>> level). Does this sound like a bad idea?
>>
>>
>> On 6 Jun 2017, at 19:14, Lukasz Cwik <lc...@google.com> wrote:
>>
>> Your right, the window acts as a secondary key within GroupByKey
>> (KeyA,Window1 != KeyA,Window2), which means that each of those two
>> composite keys can be scheduled to execute at the same time.
>>
>> At this point I think you should challenge your limited parallelism
>> requirement as you'll need to build something outside of Apache Beam to
>> provide these parallelization limits across windows (e.g. lock within the
>> same process when limiting yourself to a single machine, distributed lock
>> service when dealing with multiple machines).
>>
>> The backlog of data is either going to grow infinitely at the GroupByKey
>> or grow infinitely at the source if your pipeline can't keep up. It is up
>> to the Runner to be smart and not produce a giant backlog at the GroupByKey
>> since it knows how fast work is being completed (unfortunately I don't know
>> if any Runner is this smart yet to push the backlog up to the source).
>>
>> On Tue, Jun 6, 2017 at 11:03 AM, Josh <jo...@gmail.com> wrote:
>>
>>> I see, thanks for the tips!
>>>
>>> Last question about this! How could this be adapted to work in a
>>> unbounded/streaming job? To work in an unbounded job, I need to put a
>>> Window.into with a trigger before GroupByKey.
>>> I guess this would mean that the "shard gets processed by a single
>>> thread in MyDofn" guarantee will only apply to messages within a single
>>> window, and would not apply across windows?
>>> If this is the case, is there a better solution? I would like to avoid
>>> buffering data in windows, and want the shard guarantee to apply across
>>> windows.
>>>
>>>
>>>
>>> On Tue, Jun 6, 2017 at 5:42 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> Your code looks like what I was describing. My only comment would be to
>>>> use a deterministic hashing function which is stable across JVM versions
>>>> and JVM instances as it will help in making your pipeline consistent across
>>>> different runs/environments.
>>>>
>>>> Parallelizing across 8 instances instead of 4 would break the contract
>>>> around GroupByKey (since it didn't group all the elements for a key
>>>> correctly). Also, each element is the smallest unit of work and
>>>> specifically in your pipeline you have chosen to reduce all your elements
>>>> into 4 logical elements (each containing some proportion of your original
>>>> data).
>>>>
>>>> On Tue, Jun 6, 2017 at 9:37 AM, Josh <jo...@gmail.com> wrote:
>>>>
>>>>> Thanks for the reply, Lukasz.
>>>>>
>>>>>
>>>>> What I meant was that I want to shard my data by a "shard key", and be
>>>>> sure that any two elements with the same "shard key" are processed by the
>>>>> same thread on the same worker. (Or if that's not possible, by the same
>>>>> worker JVM with no thread guarantee would be good enough). It doesn't
>>>>> actually matter to me whether there's 1 or 4 or 100 DoFn instances
>>>>> processing the data.
>>>>>
>>>>>
>>>>> It sounds like what you suggested will work for this, with the
>>>>> downside of me needing to choose a number of shards/DoFns (e.g. 4).
>>>>>
>>>>> It seems a bit long and messy but am I right in thinking it would look
>>>>> like this? ...
>>>>>
>>>>>
>>>>> PCollection<MyElement> elements = ...;
>>>>>
>>>>> elements
>>>>>
>>>>> .apply(MapElements
>>>>>
>>>>> .into(TypeDescriptors.kvs(TypeDescriptors.integers(),
>>>>> TypeDescriptor.of(MyElement.class)))
>>>>>
>>>>> .via((MyElement e) -> KV.of(
>>>>>
>>>>> e.getKey().toString().hashCode() % 4, e)))
>>>>>
>>>>> .apply(GroupByKey.create())
>>>>>
>>>>> .apply(Partition.of(4,
>>>>>
>>>>> (Partition.PartitionFn<KV<Integer, Iterable<MyElement>>>) (kv, i) ->
>>>>> kv.getKey()))
>>>>>
>>>>> .apply(ParDo.of(new MyDofn()));
>>>>>
>>>>> // Where MyDofn must be changed to handle a KV<Integer,
>>>>> Iterable<MyElement>> as input instead of just a MyElement
>>>>>
>>>>>
>>>>> I was wondering is there a guarantee that the runner won't parallelise
>>>>> the final MyDofn across e.g. 8 instances instead of 4? If there are two
>>>>> input elements with the same key are they actually guaranteed to be
>>>>> processed on the same instance?
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Josh
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Jun 6, 2017 at 4:51 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> I think this is what your asking for but your statement about 4
>>>>>> instances is unclear as to whether that is 4 copies of the same DoFn or 4
>>>>>> completely different DoFns. Also its unclear what you mean by
>>>>>> instance/thread, I'm assuming that you want at most 4 instances of a DoFn
>>>>>> each being processed by a single thread.
>>>>>>
>>>>>> This is a bad idea because you limit your parallelism but this is
>>>>>> similar to what the default file sharding logic does. In Apache Beam the
>>>>>> smallest unit of output for a GroupByKey is a single key+iterable pair. We
>>>>>> exploit this by assigning all our values to a fixed number of keys and then
>>>>>> performing a GroupByKey. This is the same trick that powers the file
>>>>>> sharding logic in AvroIO/TextIO/...
>>>>>>
>>>>>> Your pipeline would look like (fixed width font diagram):
>>>>>> your data      -> apply shard key       -> GroupByKey        ->
>>>>>> partition by key -> your dofn #1
>>>>>>
>>>>>>            \> your dofn #2
>>>>>>
>>>>>>            \> ...
>>>>>> a  / b / c / d -> 1,a / 2,b / 1,c / 2,d -> 1,[a,c] / 2,[b,d] -> ???
>>>>>>
>>>>>> This is not exactly the same as processing a single DoFn
>>>>>> instance/thread because it relies on the Runner to be able to schedule each
>>>>>> key to be processed on a different machine. For example a Runner may choose
>>>>>> to process value 1,[a,c] and 2,[b,d] sequentially on the same machine or
>>>>>> may choose to distribute them.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Jun 6, 2017 at 8:13 AM, Josh <jo...@gmail.com> wrote:
>>>>>>
>>>>>>> Hey Lukasz,
>>>>>>>
>>>>>>> I have a follow up question about this -
>>>>>>>
>>>>>>> What if I want to do something very similar, but instead of with 4
>>>>>>> instances of AvroIO following the partition transform, I want 4 instances
>>>>>>> of a DoFn that I've written. I want to ensure that each partition is
>>>>>>> processed by a single DoFn instance/thread. Is this possible with Beam?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Josh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, May 24, 2017 at 6:15 PM, Josh <jo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Ahh I see - Ok I'll try out this solution then. Thanks Lukasz!
>>>>>>>>
>>>>>>>> On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik <lc...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Google Cloud Dataflow won't override your setting. The dynamic
>>>>>>>>> sharding occurs if you don't explicitly set a numShard value.
>>>>>>>>>
>>>>>>>>> On Wed, May 24, 2017 at 9:14 AM, Josh <jo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Lukasz,
>>>>>>>>>>
>>>>>>>>>> Thanks for the example. That sounds like a nice solution -
>>>>>>>>>> I am running on Dataflow though, which dynamically sets numShards
>>>>>>>>>> - so if I set numShards to 1 on each of those AvroIO writers, I can't be
>>>>>>>>>> sure that Dataflow isn't going to override my setting right? I guess this
>>>>>>>>>> should work fine as long as I partition my stream into a large enough
>>>>>>>>>> number of partitions so that Dataflow won't override numShards.
>>>>>>>>>>
>>>>>>>>>> Josh
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <lc...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Since your using a small number of shards, add a Partition
>>>>>>>>>>> transform which uses a deterministic hash of the key to choose one of 4
>>>>>>>>>>> partitions. Write each partition with a single shard.
>>>>>>>>>>>
>>>>>>>>>>> (Fixed width diagram below)
>>>>>>>>>>> Pipeline -> AvroIO(numShards = 4)
>>>>>>>>>>> Becomes:
>>>>>>>>>>> Pipeline -> Partition --> AvroIO(numShards = 1)
>>>>>>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>>>>>>                       \-> AvroIO(numShards = 1)
>>>>>>>>>>>
>>>>>>>>>>> On Wed, May 24, 2017 at 1:05 AM, Josh <jo...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> I am using a FileBasedSink (AvroIO.write) on an unbounded
>>>>>>>>>>>> stream (withWindowedWrites, hourly windows, numShards=4).
>>>>>>>>>>>>
>>>>>>>>>>>> I would like to partition the stream by some key in the
>>>>>>>>>>>> element, so that all elements with the same key will get processed by the
>>>>>>>>>>>> same shard writer, and therefore written to the same file. Is there a way
>>>>>>>>>>>> to do this? Note that in my stream the number of keys is very large (most
>>>>>>>>>>>> elements have a unique key, while a few elements share a key).
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Josh
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: How to partition a stream by key before writing with FileBasedSink?

Posted by Lukasz Cwik <lc...@google.com>.
Based upon your descriptions, it seemed like you wanted limited parallelism
because of an external dependency.

Your best bet would be to use the global window combined with a
StatefulDoFn. See this blog post (
https://beam.apache.org/blog/2017/02/13/stateful-processing.html) about the
StatefulDoFn.

You will not be able to use a different window function till after the
StatefulDoFn otherwise a GroupByKey may schedule your work on a different
machine since the windows for a key may differ.

Source -> StatefulDoFn -> Window.into(my other window type)

All our sources currently operate within the global window until a
Window.into happens. So there is no need to do Source ->
Window.into(GlobalWindow) -> StatefulDoFn -> Window.into(my other window
type)


On Tue, Jun 6, 2017 at 12:03 PM, <jo...@gmail.com> wrote:

> Hmm ok, I don't quite get why what I want to do isn't supported in Beam
> ... I don't actually have a limited parallelism requirement, I just want to
> be able to partition my unbounded stream by a key determined from the
> elements, so that any two elements with the same key will be routed to the
> same worker. I want to do this because my DoFn keeps some in-memory cached
> state for each key (which I was planning to store at either DoFn or JVM
> level). Does this sound like a bad idea?
>
>
> On 6 Jun 2017, at 19:14, Lukasz Cwik <lc...@google.com> wrote:
>
> Your right, the window acts as a secondary key within GroupByKey
> (KeyA,Window1 != KeyA,Window2), which means that each of those two
> composite keys can be scheduled to execute at the same time.
>
> At this point I think you should challenge your limited parallelism
> requirement as you'll need to build something outside of Apache Beam to
> provide these parallelization limits across windows (e.g. lock within the
> same process when limiting yourself to a single machine, distributed lock
> service when dealing with multiple machines).
>
> The backlog of data is either going to grow infinitely at the GroupByKey
> or grow infinitely at the source if your pipeline can't keep up. It is up
> to the Runner to be smart and not produce a giant backlog at the GroupByKey
> since it knows how fast work is being completed (unfortunately I don't know
> if any Runner is this smart yet to push the backlog up to the source).
>
> On Tue, Jun 6, 2017 at 11:03 AM, Josh <jo...@gmail.com> wrote:
>
>> I see, thanks for the tips!
>>
>> Last question about this! How could this be adapted to work in a
>> unbounded/streaming job? To work in an unbounded job, I need to put a
>> Window.into with a trigger before GroupByKey.
>> I guess this would mean that the "shard gets processed by a single thread
>> in MyDofn" guarantee will only apply to messages within a single window,
>> and would not apply across windows?
>> If this is the case, is there a better solution? I would like to avoid
>> buffering data in windows, and want the shard guarantee to apply across
>> windows.
>>
>>
>>
>> On Tue, Jun 6, 2017 at 5:42 PM, Lukasz Cwik <lc...@google.com> wrote:
>>
>>> Your code looks like what I was describing. My only comment would be to
>>> use a deterministic hashing function which is stable across JVM versions
>>> and JVM instances as it will help in making your pipeline consistent across
>>> different runs/environments.
>>>
>>> Parallelizing across 8 instances instead of 4 would break the contract
>>> around GroupByKey (since it didn't group all the elements for a key
>>> correctly). Also, each element is the smallest unit of work and
>>> specifically in your pipeline you have chosen to reduce all your elements
>>> into 4 logical elements (each containing some proportion of your original
>>> data).
>>>
>>> On Tue, Jun 6, 2017 at 9:37 AM, Josh <jo...@gmail.com> wrote:
>>>
>>>> Thanks for the reply, Lukasz.
>>>>
>>>>
>>>> What I meant was that I want to shard my data by a "shard key", and be
>>>> sure that any two elements with the same "shard key" are processed by the
>>>> same thread on the same worker. (Or if that's not possible, by the same
>>>> worker JVM with no thread guarantee would be good enough). It doesn't
>>>> actually matter to me whether there's 1 or 4 or 100 DoFn instances
>>>> processing the data.
>>>>
>>>>
>>>> It sounds like what you suggested will work for this, with the downside
>>>> of me needing to choose a number of shards/DoFns (e.g. 4).
>>>>
>>>> It seems a bit long and messy but am I right in thinking it would look
>>>> like this? ...
>>>>
>>>>
>>>> PCollection<MyElement> elements = ...;
>>>>
>>>> elements
>>>>
>>>> .apply(MapElements
>>>>
>>>> .into(TypeDescriptors.kvs(TypeDescriptors.integers(),
>>>> TypeDescriptor.of(MyElement.class)))
>>>>
>>>> .via((MyElement e) -> KV.of(
>>>>
>>>> e.getKey().toString().hashCode() % 4, e)))
>>>>
>>>> .apply(GroupByKey.create())
>>>>
>>>> .apply(Partition.of(4,
>>>>
>>>> (Partition.PartitionFn<KV<Integer, Iterable<MyElement>>>) (kv, i) ->
>>>> kv.getKey()))
>>>>
>>>> .apply(ParDo.of(new MyDofn()));
>>>>
>>>> // Where MyDofn must be changed to handle a KV<Integer,
>>>> Iterable<MyElement>> as input instead of just a MyElement
>>>>
>>>>
>>>> I was wondering is there a guarantee that the runner won't parallelise
>>>> the final MyDofn across e.g. 8 instances instead of 4? If there are two
>>>> input elements with the same key are they actually guaranteed to be
>>>> processed on the same instance?
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Josh
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Jun 6, 2017 at 4:51 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> I think this is what your asking for but your statement about 4
>>>>> instances is unclear as to whether that is 4 copies of the same DoFn or 4
>>>>> completely different DoFns. Also its unclear what you mean by
>>>>> instance/thread, I'm assuming that you want at most 4 instances of a DoFn
>>>>> each being processed by a single thread.
>>>>>
>>>>> This is a bad idea because you limit your parallelism but this is
>>>>> similar to what the default file sharding logic does. In Apache Beam the
>>>>> smallest unit of output for a GroupByKey is a single key+iterable pair. We
>>>>> exploit this by assigning all our values to a fixed number of keys and then
>>>>> performing a GroupByKey. This is the same trick that powers the file
>>>>> sharding logic in AvroIO/TextIO/...
>>>>>
>>>>> Your pipeline would look like (fixed width font diagram):
>>>>> your data      -> apply shard key       -> GroupByKey        ->
>>>>> partition by key -> your dofn #1
>>>>>
>>>>>            \> your dofn #2
>>>>>
>>>>>            \> ...
>>>>> a  / b / c / d -> 1,a / 2,b / 1,c / 2,d -> 1,[a,c] / 2,[b,d] -> ???
>>>>>
>>>>> This is not exactly the same as processing a single DoFn
>>>>> instance/thread because it relies on the Runner to be able to schedule each
>>>>> key to be processed on a different machine. For example a Runner may choose
>>>>> to process value 1,[a,c] and 2,[b,d] sequentially on the same machine or
>>>>> may choose to distribute them.
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Jun 6, 2017 at 8:13 AM, Josh <jo...@gmail.com> wrote:
>>>>>
>>>>>> Hey Lukasz,
>>>>>>
>>>>>> I have a follow up question about this -
>>>>>>
>>>>>> What if I want to do something very similar, but instead of with 4
>>>>>> instances of AvroIO following the partition transform, I want 4 instances
>>>>>> of a DoFn that I've written. I want to ensure that each partition is
>>>>>> processed by a single DoFn instance/thread. Is this possible with Beam?
>>>>>>
>>>>>> Thanks,
>>>>>> Josh
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, May 24, 2017 at 6:15 PM, Josh <jo...@gmail.com> wrote:
>>>>>>
>>>>>>> Ahh I see - Ok I'll try out this solution then. Thanks Lukasz!
>>>>>>>
>>>>>>> On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik <lc...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Google Cloud Dataflow won't override your setting. The dynamic
>>>>>>>> sharding occurs if you don't explicitly set a numShard value.
>>>>>>>>
>>>>>>>> On Wed, May 24, 2017 at 9:14 AM, Josh <jo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Lukasz,
>>>>>>>>>
>>>>>>>>> Thanks for the example. That sounds like a nice solution -
>>>>>>>>> I am running on Dataflow though, which dynamically sets numShards
>>>>>>>>> - so if I set numShards to 1 on each of those AvroIO writers, I can't be
>>>>>>>>> sure that Dataflow isn't going to override my setting right? I guess this
>>>>>>>>> should work fine as long as I partition my stream into a large enough
>>>>>>>>> number of partitions so that Dataflow won't override numShards.
>>>>>>>>>
>>>>>>>>> Josh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <lc...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Since your using a small number of shards, add a Partition
>>>>>>>>>> transform which uses a deterministic hash of the key to choose one of 4
>>>>>>>>>> partitions. Write each partition with a single shard.
>>>>>>>>>>
>>>>>>>>>> (Fixed width diagram below)
>>>>>>>>>> Pipeline -> AvroIO(numShards = 4)
>>>>>>>>>> Becomes:
>>>>>>>>>> Pipeline -> Partition --> AvroIO(numShards = 1)
>>>>>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>>>>>                       \-> AvroIO(numShards = 1)
>>>>>>>>>>
>>>>>>>>>> On Wed, May 24, 2017 at 1:05 AM, Josh <jo...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> I am using a FileBasedSink (AvroIO.write) on an unbounded stream
>>>>>>>>>>> (withWindowedWrites, hourly windows, numShards=4).
>>>>>>>>>>>
>>>>>>>>>>> I would like to partition the stream by some key in the element,
>>>>>>>>>>> so that all elements with the same key will get processed by the same shard
>>>>>>>>>>> writer, and therefore written to the same file. Is there a way to do this?
>>>>>>>>>>> Note that in my stream the number of keys is very large (most elements have
>>>>>>>>>>> a unique key, while a few elements share a key).
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Josh
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: How to partition a stream by key before writing with FileBasedSink?

Posted by jo...@gmail.com.
Hmm ok, I don't quite get why what I want to do isn't supported in Beam ... I don't actually have a limited parallelism requirement, I just want to be able to partition my unbounded stream by a key determined from the elements, so that any two elements with the same key will be routed to the same worker. I want to do this because my DoFn keeps some in-memory cached state for each key (which I was planning to store at either DoFn or JVM level). Does this sound like a bad idea? 


> On 6 Jun 2017, at 19:14, Lukasz Cwik <lc...@google.com> wrote:
> 
> Your right, the window acts as a secondary key within GroupByKey (KeyA,Window1 != KeyA,Window2), which means that each of those two composite keys can be scheduled to execute at the same time.
> 
> At this point I think you should challenge your limited parallelism requirement as you'll need to build something outside of Apache Beam to provide these parallelization limits across windows (e.g. lock within the same process when limiting yourself to a single machine, distributed lock service when dealing with multiple machines).
> 
> The backlog of data is either going to grow infinitely at the GroupByKey or grow infinitely at the source if your pipeline can't keep up. It is up to the Runner to be smart and not produce a giant backlog at the GroupByKey since it knows how fast work is being completed (unfortunately I don't know if any Runner is this smart yet to push the backlog up to the source).
> 
>> On Tue, Jun 6, 2017 at 11:03 AM, Josh <jo...@gmail.com> wrote:
>> I see, thanks for the tips!
>> 
>> Last question about this! How could this be adapted to work in a unbounded/streaming job? To work in an unbounded job, I need to put a Window.into with a trigger before GroupByKey. 
>> I guess this would mean that the "shard gets processed by a single thread in MyDofn" guarantee will only apply to messages within a single window, and would not apply across windows?
>> If this is the case, is there a better solution? I would like to avoid buffering data in windows, and want the shard guarantee to apply across windows.
>> 
>> 
>> 
>>> On Tue, Jun 6, 2017 at 5:42 PM, Lukasz Cwik <lc...@google.com> wrote:
>>> Your code looks like what I was describing. My only comment would be to use a deterministic hashing function which is stable across JVM versions and JVM instances as it will help in making your pipeline consistent across different runs/environments.
>>> 
>>> Parallelizing across 8 instances instead of 4 would break the contract around GroupByKey (since it didn't group all the elements for a key correctly). Also, each element is the smallest unit of work and specifically in your pipeline you have chosen to reduce all your elements into 4 logical elements (each containing some proportion of your original data).
>>> 
>>>> On Tue, Jun 6, 2017 at 9:37 AM, Josh <jo...@gmail.com> wrote:
>>>> Thanks for the reply, Lukasz. 
>>>> 
>>>> What I meant was that I want to shard my data by a "shard key", and be sure that any two elements with the same "shard key" are processed by the same thread on the same worker. (Or if that's not possible, by the same worker JVM with no thread guarantee would be good enough). It doesn't actually matter to me whether there's 1 or 4 or 100 DoFn instances processing the data.
>>>> 
>>>> It sounds like what you suggested will work for this, with the downside of me needing to choose a number of shards/DoFns (e.g. 4).
>>>> It seems a bit long and messy but am I right in thinking it would look like this? ...
>>>> 
>>>> PCollection<MyElement> elements = ...;
>>>> elements
>>>>             .apply(MapElements
>>>>                 .into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptor.of(MyElement.class)))
>>>>                 .via((MyElement e) -> KV.of(
>>>>                     e.getKey().toString().hashCode() % 4, e)))
>>>>             .apply(GroupByKey.create())
>>>>             .apply(Partition.of(4,
>>>>                 (Partition.PartitionFn<KV<Integer, Iterable<MyElement>>>) (kv, i) -> kv.getKey()))
>>>>             .apply(ParDo.of(new MyDofn()));
>>>> // Where MyDofn must be changed to handle a KV<Integer, Iterable<MyElement>> as input instead of just a MyElement
>>>> 
>>>> I was wondering is there a guarantee that the runner won't parallelise the final MyDofn across e.g. 8 instances instead of 4? If there are two input elements with the same key are they actually guaranteed to be processed on the same instance?
>>>> 
>>>> Thanks,
>>>> Josh
>>>> 
>>>> 
>>>> 
>>>>> On Tue, Jun 6, 2017 at 4:51 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>> I think this is what your asking for but your statement about 4 instances is unclear as to whether that is 4 copies of the same DoFn or 4 completely different DoFns. Also its unclear what you mean by instance/thread, I'm assuming that you want at most 4 instances of a DoFn each being processed by a single thread.
>>>>> 
>>>>> This is a bad idea because you limit your parallelism but this is similar to what the default file sharding logic does. In Apache Beam the smallest unit of output for a GroupByKey is a single key+iterable pair. We exploit this by assigning all our values to a fixed number of keys and then performing a GroupByKey. This is the same trick that powers the file sharding logic in AvroIO/TextIO/... 
>>>>> 
>>>>> Your pipeline would look like (fixed width font diagram):
>>>>> your data      -> apply shard key       -> GroupByKey        -> partition by key -> your dofn #1
>>>>>                                                                                  \> your dofn #2
>>>>>                                                                                  \> ...
>>>>> a  / b / c / d -> 1,a / 2,b / 1,c / 2,d -> 1,[a,c] / 2,[b,d] -> ???
>>>>> 
>>>>> This is not exactly the same as processing a single DoFn instance/thread because it relies on the Runner to be able to schedule each key to be processed on a different machine. For example a Runner may choose to process value 1,[a,c] and 2,[b,d] sequentially on the same machine or may choose to distribute them.
>>>>> 
>>>>> 
>>>>> 
>>>>>> On Tue, Jun 6, 2017 at 8:13 AM, Josh <jo...@gmail.com> wrote:
>>>>>> Hey Lukasz,
>>>>>> 
>>>>>> I have a follow up question about this - 
>>>>>> 
>>>>>> What if I want to do something very similar, but instead of with 4 instances of AvroIO following the partition transform, I want 4 instances of a DoFn that I've written. I want to ensure that each partition is processed by a single DoFn instance/thread. Is this possible with Beam?
>>>>>> 
>>>>>> Thanks,
>>>>>> Josh
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>> On Wed, May 24, 2017 at 6:15 PM, Josh <jo...@gmail.com> wrote:
>>>>>>> Ahh I see - Ok I'll try out this solution then. Thanks Lukasz!
>>>>>>> 
>>>>>>>> On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>>>>> Google Cloud Dataflow won't override your setting. The dynamic sharding occurs if you don't explicitly set a numShard value.
>>>>>>>> 
>>>>>>>>> On Wed, May 24, 2017 at 9:14 AM, Josh <jo...@gmail.com> wrote:
>>>>>>>>> Hi Lukasz,
>>>>>>>>> 
>>>>>>>>> Thanks for the example. That sounds like a nice solution - 
>>>>>>>>> I am running on Dataflow though, which dynamically sets numShards - so if I set numShards to 1 on each of those AvroIO writers, I can't be sure that Dataflow isn't going to override my setting right? I guess this should work fine as long as I partition my stream into a large enough number of partitions so that Dataflow won't override numShards.
>>>>>>>>> 
>>>>>>>>> Josh
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>>>>>>> Since your using a small number of shards, add a Partition transform which uses a deterministic hash of the key to choose one of 4 partitions. Write each partition with a single shard.
>>>>>>>>>> 
>>>>>>>>>> (Fixed width diagram below)
>>>>>>>>>> Pipeline -> AvroIO(numShards = 4)
>>>>>>>>>> Becomes:
>>>>>>>>>> Pipeline -> Partition --> AvroIO(numShards = 1)
>>>>>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>>>>>                       \-> AvroIO(numShards = 1)
>>>>>>>>>> 
>>>>>>>>>>> On Wed, May 24, 2017 at 1:05 AM, Josh <jo...@gmail.com> wrote:
>>>>>>>>>>> Hi,
>>>>>>>>>>> 
>>>>>>>>>>> I am using a FileBasedSink (AvroIO.write) on an unbounded stream (withWindowedWrites, hourly windows, numShards=4).
>>>>>>>>>>> 
>>>>>>>>>>> I would like to partition the stream by some key in the element, so that all elements with the same key will get processed by the same shard writer, and therefore written to the same file. Is there a way to do this? Note that in my stream the number of keys is very large (most elements have a unique key, while a few elements share a key).
>>>>>>>>>>> 
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Josh
> 

Re: How to partition a stream by key before writing with FileBasedSink?

Posted by Lukasz Cwik <lc...@google.com>.
Your right, the window acts as a secondary key within GroupByKey
(KeyA,Window1 != KeyA,Window2), which means that each of those two
composite keys can be scheduled to execute at the same time.

At this point I think you should challenge your limited parallelism
requirement as you'll need to build something outside of Apache Beam to
provide these parallelization limits across windows (e.g. lock within the
same process when limiting yourself to a single machine, distributed lock
service when dealing with multiple machines).

The backlog of data is either going to grow infinitely at the GroupByKey or
grow infinitely at the source if your pipeline can't keep up. It is up to
the Runner to be smart and not produce a giant backlog at the GroupByKey
since it knows how fast work is being completed (unfortunately I don't know
if any Runner is this smart yet to push the backlog up to the source).

On Tue, Jun 6, 2017 at 11:03 AM, Josh <jo...@gmail.com> wrote:

> I see, thanks for the tips!
>
> Last question about this! How could this be adapted to work in a
> unbounded/streaming job? To work in an unbounded job, I need to put a
> Window.into with a trigger before GroupByKey.
> I guess this would mean that the "shard gets processed by a single thread
> in MyDofn" guarantee will only apply to messages within a single window,
> and would not apply across windows?
> If this is the case, is there a better solution? I would like to avoid
> buffering data in windows, and want the shard guarantee to apply across
> windows.
>
>
>
> On Tue, Jun 6, 2017 at 5:42 PM, Lukasz Cwik <lc...@google.com> wrote:
>
>> Your code looks like what I was describing. My only comment would be to
>> use a deterministic hashing function which is stable across JVM versions
>> and JVM instances as it will help in making your pipeline consistent across
>> different runs/environments.
>>
>> Parallelizing across 8 instances instead of 4 would break the contract
>> around GroupByKey (since it didn't group all the elements for a key
>> correctly). Also, each element is the smallest unit of work and
>> specifically in your pipeline you have chosen to reduce all your elements
>> into 4 logical elements (each containing some proportion of your original
>> data).
>>
>> On Tue, Jun 6, 2017 at 9:37 AM, Josh <jo...@gmail.com> wrote:
>>
>>> Thanks for the reply, Lukasz.
>>>
>>>
>>> What I meant was that I want to shard my data by a "shard key", and be
>>> sure that any two elements with the same "shard key" are processed by the
>>> same thread on the same worker. (Or if that's not possible, by the same
>>> worker JVM with no thread guarantee would be good enough). It doesn't
>>> actually matter to me whether there's 1 or 4 or 100 DoFn instances
>>> processing the data.
>>>
>>>
>>> It sounds like what you suggested will work for this, with the downside
>>> of me needing to choose a number of shards/DoFns (e.g. 4).
>>>
>>> It seems a bit long and messy but am I right in thinking it would look
>>> like this? ...
>>>
>>>
>>> PCollection<MyElement> elements = ...;
>>>
>>> elements
>>>
>>> .apply(MapElements
>>>
>>> .into(TypeDescriptors.kvs(TypeDescriptors.integers(),
>>> TypeDescriptor.of(MyElement.class)))
>>>
>>> .via((MyElement e) -> KV.of(
>>>
>>> e.getKey().toString().hashCode() % 4, e)))
>>>
>>> .apply(GroupByKey.create())
>>>
>>> .apply(Partition.of(4,
>>>
>>> (Partition.PartitionFn<KV<Integer, Iterable<MyElement>>>) (kv, i) ->
>>> kv.getKey()))
>>>
>>> .apply(ParDo.of(new MyDofn()));
>>>
>>> // Where MyDofn must be changed to handle a KV<Integer,
>>> Iterable<MyElement>> as input instead of just a MyElement
>>>
>>>
>>> I was wondering is there a guarantee that the runner won't parallelise
>>> the final MyDofn across e.g. 8 instances instead of 4? If there are two
>>> input elements with the same key are they actually guaranteed to be
>>> processed on the same instance?
>>>
>>>
>>> Thanks,
>>>
>>> Josh
>>>
>>>
>>>
>>>
>>> On Tue, Jun 6, 2017 at 4:51 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> I think this is what your asking for but your statement about 4
>>>> instances is unclear as to whether that is 4 copies of the same DoFn or 4
>>>> completely different DoFns. Also its unclear what you mean by
>>>> instance/thread, I'm assuming that you want at most 4 instances of a DoFn
>>>> each being processed by a single thread.
>>>>
>>>> This is a bad idea because you limit your parallelism but this is
>>>> similar to what the default file sharding logic does. In Apache Beam the
>>>> smallest unit of output for a GroupByKey is a single key+iterable pair. We
>>>> exploit this by assigning all our values to a fixed number of keys and then
>>>> performing a GroupByKey. This is the same trick that powers the file
>>>> sharding logic in AvroIO/TextIO/...
>>>>
>>>> Your pipeline would look like (fixed width font diagram):
>>>> your data      -> apply shard key       -> GroupByKey        ->
>>>> partition by key -> your dofn #1
>>>>
>>>>          \> your dofn #2
>>>>
>>>>          \> ...
>>>> a  / b / c / d -> 1,a / 2,b / 1,c / 2,d -> 1,[a,c] / 2,[b,d] -> ???
>>>>
>>>> This is not exactly the same as processing a single DoFn
>>>> instance/thread because it relies on the Runner to be able to schedule each
>>>> key to be processed on a different machine. For example a Runner may choose
>>>> to process value 1,[a,c] and 2,[b,d] sequentially on the same machine or
>>>> may choose to distribute them.
>>>>
>>>>
>>>>
>>>> On Tue, Jun 6, 2017 at 8:13 AM, Josh <jo...@gmail.com> wrote:
>>>>
>>>>> Hey Lukasz,
>>>>>
>>>>> I have a follow up question about this -
>>>>>
>>>>> What if I want to do something very similar, but instead of with 4
>>>>> instances of AvroIO following the partition transform, I want 4 instances
>>>>> of a DoFn that I've written. I want to ensure that each partition is
>>>>> processed by a single DoFn instance/thread. Is this possible with Beam?
>>>>>
>>>>> Thanks,
>>>>> Josh
>>>>>
>>>>>
>>>>>
>>>>> On Wed, May 24, 2017 at 6:15 PM, Josh <jo...@gmail.com> wrote:
>>>>>
>>>>>> Ahh I see - Ok I'll try out this solution then. Thanks Lukasz!
>>>>>>
>>>>>> On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik <lc...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Google Cloud Dataflow won't override your setting. The dynamic
>>>>>>> sharding occurs if you don't explicitly set a numShard value.
>>>>>>>
>>>>>>> On Wed, May 24, 2017 at 9:14 AM, Josh <jo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Lukasz,
>>>>>>>>
>>>>>>>> Thanks for the example. That sounds like a nice solution -
>>>>>>>> I am running on Dataflow though, which dynamically sets numShards -
>>>>>>>> so if I set numShards to 1 on each of those AvroIO writers, I can't be sure
>>>>>>>> that Dataflow isn't going to override my setting right? I guess this should
>>>>>>>> work fine as long as I partition my stream into a large enough number of
>>>>>>>> partitions so that Dataflow won't override numShards.
>>>>>>>>
>>>>>>>> Josh
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <lc...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Since your using a small number of shards, add a Partition
>>>>>>>>> transform which uses a deterministic hash of the key to choose one of 4
>>>>>>>>> partitions. Write each partition with a single shard.
>>>>>>>>>
>>>>>>>>> (Fixed width diagram below)
>>>>>>>>> Pipeline -> AvroIO(numShards = 4)
>>>>>>>>> Becomes:
>>>>>>>>> Pipeline -> Partition --> AvroIO(numShards = 1)
>>>>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>>>>                       \-> AvroIO(numShards = 1)
>>>>>>>>>
>>>>>>>>> On Wed, May 24, 2017 at 1:05 AM, Josh <jo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I am using a FileBasedSink (AvroIO.write) on an unbounded stream
>>>>>>>>>> (withWindowedWrites, hourly windows, numShards=4).
>>>>>>>>>>
>>>>>>>>>> I would like to partition the stream by some key in the element,
>>>>>>>>>> so that all elements with the same key will get processed by the same shard
>>>>>>>>>> writer, and therefore written to the same file. Is there a way to do this?
>>>>>>>>>> Note that in my stream the number of keys is very large (most elements have
>>>>>>>>>> a unique key, while a few elements share a key).
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Josh
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: How to partition a stream by key before writing with FileBasedSink?

Posted by Josh <jo...@gmail.com>.
I see, thanks for the tips!

Last question about this! How could this be adapted to work in a
unbounded/streaming job? To work in an unbounded job, I need to put a
Window.into with a trigger before GroupByKey.
I guess this would mean that the "shard gets processed by a single thread
in MyDofn" guarantee will only apply to messages within a single window,
and would not apply across windows?
If this is the case, is there a better solution? I would like to avoid
buffering data in windows, and want the shard guarantee to apply across
windows.



On Tue, Jun 6, 2017 at 5:42 PM, Lukasz Cwik <lc...@google.com> wrote:

> Your code looks like what I was describing. My only comment would be to
> use a deterministic hashing function which is stable across JVM versions
> and JVM instances as it will help in making your pipeline consistent across
> different runs/environments.
>
> Parallelizing across 8 instances instead of 4 would break the contract
> around GroupByKey (since it didn't group all the elements for a key
> correctly). Also, each element is the smallest unit of work and
> specifically in your pipeline you have chosen to reduce all your elements
> into 4 logical elements (each containing some proportion of your original
> data).
>
> On Tue, Jun 6, 2017 at 9:37 AM, Josh <jo...@gmail.com> wrote:
>
>> Thanks for the reply, Lukasz.
>>
>>
>> What I meant was that I want to shard my data by a "shard key", and be
>> sure that any two elements with the same "shard key" are processed by the
>> same thread on the same worker. (Or if that's not possible, by the same
>> worker JVM with no thread guarantee would be good enough). It doesn't
>> actually matter to me whether there's 1 or 4 or 100 DoFn instances
>> processing the data.
>>
>>
>> It sounds like what you suggested will work for this, with the downside
>> of me needing to choose a number of shards/DoFns (e.g. 4).
>>
>> It seems a bit long and messy but am I right in thinking it would look
>> like this? ...
>>
>>
>> PCollection<MyElement> elements = ...;
>>
>> elements
>>
>> .apply(MapElements
>>
>> .into(TypeDescriptors.kvs(TypeDescriptors.integers(),
>> TypeDescriptor.of(MyElement.class)))
>>
>> .via((MyElement e) -> KV.of(
>>
>> e.getKey().toString().hashCode() % 4, e)))
>>
>> .apply(GroupByKey.create())
>>
>> .apply(Partition.of(4,
>>
>> (Partition.PartitionFn<KV<Integer, Iterable<MyElement>>>) (kv, i) ->
>> kv.getKey()))
>>
>> .apply(ParDo.of(new MyDofn()));
>>
>> // Where MyDofn must be changed to handle a KV<Integer,
>> Iterable<MyElement>> as input instead of just a MyElement
>>
>>
>> I was wondering is there a guarantee that the runner won't parallelise
>> the final MyDofn across e.g. 8 instances instead of 4? If there are two
>> input elements with the same key are they actually guaranteed to be
>> processed on the same instance?
>>
>>
>> Thanks,
>>
>> Josh
>>
>>
>>
>>
>> On Tue, Jun 6, 2017 at 4:51 PM, Lukasz Cwik <lc...@google.com> wrote:
>>
>>> I think this is what your asking for but your statement about 4
>>> instances is unclear as to whether that is 4 copies of the same DoFn or 4
>>> completely different DoFns. Also its unclear what you mean by
>>> instance/thread, I'm assuming that you want at most 4 instances of a DoFn
>>> each being processed by a single thread.
>>>
>>> This is a bad idea because you limit your parallelism but this is
>>> similar to what the default file sharding logic does. In Apache Beam the
>>> smallest unit of output for a GroupByKey is a single key+iterable pair. We
>>> exploit this by assigning all our values to a fixed number of keys and then
>>> performing a GroupByKey. This is the same trick that powers the file
>>> sharding logic in AvroIO/TextIO/...
>>>
>>> Your pipeline would look like (fixed width font diagram):
>>> your data      -> apply shard key       -> GroupByKey        ->
>>> partition by key -> your dofn #1
>>>
>>>          \> your dofn #2
>>>
>>>          \> ...
>>> a  / b / c / d -> 1,a / 2,b / 1,c / 2,d -> 1,[a,c] / 2,[b,d] -> ???
>>>
>>> This is not exactly the same as processing a single DoFn instance/thread
>>> because it relies on the Runner to be able to schedule each key to be
>>> processed on a different machine. For example a Runner may choose to
>>> process value 1,[a,c] and 2,[b,d] sequentially on the same machine or may
>>> choose to distribute them.
>>>
>>>
>>>
>>> On Tue, Jun 6, 2017 at 8:13 AM, Josh <jo...@gmail.com> wrote:
>>>
>>>> Hey Lukasz,
>>>>
>>>> I have a follow up question about this -
>>>>
>>>> What if I want to do something very similar, but instead of with 4
>>>> instances of AvroIO following the partition transform, I want 4 instances
>>>> of a DoFn that I've written. I want to ensure that each partition is
>>>> processed by a single DoFn instance/thread. Is this possible with Beam?
>>>>
>>>> Thanks,
>>>> Josh
>>>>
>>>>
>>>>
>>>> On Wed, May 24, 2017 at 6:15 PM, Josh <jo...@gmail.com> wrote:
>>>>
>>>>> Ahh I see - Ok I'll try out this solution then. Thanks Lukasz!
>>>>>
>>>>> On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> Google Cloud Dataflow won't override your setting. The dynamic
>>>>>> sharding occurs if you don't explicitly set a numShard value.
>>>>>>
>>>>>> On Wed, May 24, 2017 at 9:14 AM, Josh <jo...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Lukasz,
>>>>>>>
>>>>>>> Thanks for the example. That sounds like a nice solution -
>>>>>>> I am running on Dataflow though, which dynamically sets numShards -
>>>>>>> so if I set numShards to 1 on each of those AvroIO writers, I can't be sure
>>>>>>> that Dataflow isn't going to override my setting right? I guess this should
>>>>>>> work fine as long as I partition my stream into a large enough number of
>>>>>>> partitions so that Dataflow won't override numShards.
>>>>>>>
>>>>>>> Josh
>>>>>>>
>>>>>>>
>>>>>>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <lc...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Since your using a small number of shards, add a Partition
>>>>>>>> transform which uses a deterministic hash of the key to choose one of 4
>>>>>>>> partitions. Write each partition with a single shard.
>>>>>>>>
>>>>>>>> (Fixed width diagram below)
>>>>>>>> Pipeline -> AvroIO(numShards = 4)
>>>>>>>> Becomes:
>>>>>>>> Pipeline -> Partition --> AvroIO(numShards = 1)
>>>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>>>                       \-> AvroIO(numShards = 1)
>>>>>>>>
>>>>>>>> On Wed, May 24, 2017 at 1:05 AM, Josh <jo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I am using a FileBasedSink (AvroIO.write) on an unbounded stream
>>>>>>>>> (withWindowedWrites, hourly windows, numShards=4).
>>>>>>>>>
>>>>>>>>> I would like to partition the stream by some key in the element,
>>>>>>>>> so that all elements with the same key will get processed by the same shard
>>>>>>>>> writer, and therefore written to the same file. Is there a way to do this?
>>>>>>>>> Note that in my stream the number of keys is very large (most elements have
>>>>>>>>> a unique key, while a few elements share a key).
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Josh
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: How to partition a stream by key before writing with FileBasedSink?

Posted by Lukasz Cwik <lc...@google.com>.
Your code looks like what I was describing. My only comment would be to use
a deterministic hashing function which is stable across JVM versions and
JVM instances as it will help in making your pipeline consistent across
different runs/environments.

Parallelizing across 8 instances instead of 4 would break the contract
around GroupByKey (since it didn't group all the elements for a key
correctly). Also, each element is the smallest unit of work and
specifically in your pipeline you have chosen to reduce all your elements
into 4 logical elements (each containing some proportion of your original
data).

On Tue, Jun 6, 2017 at 9:37 AM, Josh <jo...@gmail.com> wrote:

> Thanks for the reply, Lukasz.
>
>
> What I meant was that I want to shard my data by a "shard key", and be
> sure that any two elements with the same "shard key" are processed by the
> same thread on the same worker. (Or if that's not possible, by the same
> worker JVM with no thread guarantee would be good enough). It doesn't
> actually matter to me whether there's 1 or 4 or 100 DoFn instances
> processing the data.
>
>
> It sounds like what you suggested will work for this, with the downside of
> me needing to choose a number of shards/DoFns (e.g. 4).
>
> It seems a bit long and messy but am I right in thinking it would look
> like this? ...
>
>
> PCollection<MyElement> elements = ...;
>
> elements
>
> .apply(MapElements
>
> .into(TypeDescriptors.kvs(TypeDescriptors.integers(),
> TypeDescriptor.of(MyElement.class)))
>
> .via((MyElement e) -> KV.of(
>
> e.getKey().toString().hashCode() % 4, e)))
>
> .apply(GroupByKey.create())
>
> .apply(Partition.of(4,
>
> (Partition.PartitionFn<KV<Integer, Iterable<MyElement>>>) (kv, i) ->
> kv.getKey()))
>
> .apply(ParDo.of(new MyDofn()));
>
> // Where MyDofn must be changed to handle a KV<Integer,
> Iterable<MyElement>> as input instead of just a MyElement
>
>
> I was wondering is there a guarantee that the runner won't parallelise the
> final MyDofn across e.g. 8 instances instead of 4? If there are two input
> elements with the same key are they actually guaranteed to be processed on
> the same instance?
>
>
> Thanks,
>
> Josh
>
>
>
>
> On Tue, Jun 6, 2017 at 4:51 PM, Lukasz Cwik <lc...@google.com> wrote:
>
>> I think this is what your asking for but your statement about 4 instances
>> is unclear as to whether that is 4 copies of the same DoFn or 4 completely
>> different DoFns. Also its unclear what you mean by instance/thread, I'm
>> assuming that you want at most 4 instances of a DoFn each being processed
>> by a single thread.
>>
>> This is a bad idea because you limit your parallelism but this is similar
>> to what the default file sharding logic does. In Apache Beam the smallest
>> unit of output for a GroupByKey is a single key+iterable pair. We exploit
>> this by assigning all our values to a fixed number of keys and then
>> performing a GroupByKey. This is the same trick that powers the file
>> sharding logic in AvroIO/TextIO/...
>>
>> Your pipeline would look like (fixed width font diagram):
>> your data      -> apply shard key       -> GroupByKey        -> partition
>> by key -> your dofn #1
>>
>>        \> your dofn #2
>>
>>        \> ...
>> a  / b / c / d -> 1,a / 2,b / 1,c / 2,d -> 1,[a,c] / 2,[b,d] -> ???
>>
>> This is not exactly the same as processing a single DoFn instance/thread
>> because it relies on the Runner to be able to schedule each key to be
>> processed on a different machine. For example a Runner may choose to
>> process value 1,[a,c] and 2,[b,d] sequentially on the same machine or may
>> choose to distribute them.
>>
>>
>>
>> On Tue, Jun 6, 2017 at 8:13 AM, Josh <jo...@gmail.com> wrote:
>>
>>> Hey Lukasz,
>>>
>>> I have a follow up question about this -
>>>
>>> What if I want to do something very similar, but instead of with 4
>>> instances of AvroIO following the partition transform, I want 4 instances
>>> of a DoFn that I've written. I want to ensure that each partition is
>>> processed by a single DoFn instance/thread. Is this possible with Beam?
>>>
>>> Thanks,
>>> Josh
>>>
>>>
>>>
>>> On Wed, May 24, 2017 at 6:15 PM, Josh <jo...@gmail.com> wrote:
>>>
>>>> Ahh I see - Ok I'll try out this solution then. Thanks Lukasz!
>>>>
>>>> On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> Google Cloud Dataflow won't override your setting. The dynamic
>>>>> sharding occurs if you don't explicitly set a numShard value.
>>>>>
>>>>> On Wed, May 24, 2017 at 9:14 AM, Josh <jo...@gmail.com> wrote:
>>>>>
>>>>>> Hi Lukasz,
>>>>>>
>>>>>> Thanks for the example. That sounds like a nice solution -
>>>>>> I am running on Dataflow though, which dynamically sets numShards -
>>>>>> so if I set numShards to 1 on each of those AvroIO writers, I can't be sure
>>>>>> that Dataflow isn't going to override my setting right? I guess this should
>>>>>> work fine as long as I partition my stream into a large enough number of
>>>>>> partitions so that Dataflow won't override numShards.
>>>>>>
>>>>>> Josh
>>>>>>
>>>>>>
>>>>>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <lc...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Since your using a small number of shards, add a Partition transform
>>>>>>> which uses a deterministic hash of the key to choose one of 4 partitions.
>>>>>>> Write each partition with a single shard.
>>>>>>>
>>>>>>> (Fixed width diagram below)
>>>>>>> Pipeline -> AvroIO(numShards = 4)
>>>>>>> Becomes:
>>>>>>> Pipeline -> Partition --> AvroIO(numShards = 1)
>>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>>                       \-> AvroIO(numShards = 1)
>>>>>>>
>>>>>>> On Wed, May 24, 2017 at 1:05 AM, Josh <jo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I am using a FileBasedSink (AvroIO.write) on an unbounded stream
>>>>>>>> (withWindowedWrites, hourly windows, numShards=4).
>>>>>>>>
>>>>>>>> I would like to partition the stream by some key in the element, so
>>>>>>>> that all elements with the same key will get processed by the same shard
>>>>>>>> writer, and therefore written to the same file. Is there a way to do this?
>>>>>>>> Note that in my stream the number of keys is very large (most elements have
>>>>>>>> a unique key, while a few elements share a key).
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Josh
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: How to partition a stream by key before writing with FileBasedSink?

Posted by Josh <jo...@gmail.com>.
Thanks for the reply, Lukasz.


What I meant was that I want to shard my data by a "shard key", and be sure
that any two elements with the same "shard key" are processed by the same
thread on the same worker. (Or if that's not possible, by the same worker
JVM with no thread guarantee would be good enough). It doesn't actually
matter to me whether there's 1 or 4 or 100 DoFn instances processing the
data.


It sounds like what you suggested will work for this, with the downside of
me needing to choose a number of shards/DoFns (e.g. 4).

It seems a bit long and messy but am I right in thinking it would look like
this? ...


PCollection<MyElement> elements = ...;

elements

.apply(MapElements

.into(TypeDescriptors.kvs(TypeDescriptors.integers(),
TypeDescriptor.of(MyElement.class)))

.via((MyElement e) -> KV.of(

e.getKey().toString().hashCode() % 4, e)))

.apply(GroupByKey.create())

.apply(Partition.of(4,

(Partition.PartitionFn<KV<Integer, Iterable<MyElement>>>) (kv, i) ->
kv.getKey()))

.apply(ParDo.of(new MyDofn()));

// Where MyDofn must be changed to handle a KV<Integer,
Iterable<MyElement>> as input instead of just a MyElement


I was wondering is there a guarantee that the runner won't parallelise the
final MyDofn across e.g. 8 instances instead of 4? If there are two input
elements with the same key are they actually guaranteed to be processed on
the same instance?


Thanks,

Josh




On Tue, Jun 6, 2017 at 4:51 PM, Lukasz Cwik <lc...@google.com> wrote:

> I think this is what your asking for but your statement about 4 instances
> is unclear as to whether that is 4 copies of the same DoFn or 4 completely
> different DoFns. Also its unclear what you mean by instance/thread, I'm
> assuming that you want at most 4 instances of a DoFn each being processed
> by a single thread.
>
> This is a bad idea because you limit your parallelism but this is similar
> to what the default file sharding logic does. In Apache Beam the smallest
> unit of output for a GroupByKey is a single key+iterable pair. We exploit
> this by assigning all our values to a fixed number of keys and then
> performing a GroupByKey. This is the same trick that powers the file
> sharding logic in AvroIO/TextIO/...
>
> Your pipeline would look like (fixed width font diagram):
> your data      -> apply shard key       -> GroupByKey        -> partition
> by key -> your dofn #1
>
>        \> your dofn #2
>
>        \> ...
> a  / b / c / d -> 1,a / 2,b / 1,c / 2,d -> 1,[a,c] / 2,[b,d] -> ???
>
> This is not exactly the same as processing a single DoFn instance/thread
> because it relies on the Runner to be able to schedule each key to be
> processed on a different machine. For example a Runner may choose to
> process value 1,[a,c] and 2,[b,d] sequentially on the same machine or may
> choose to distribute them.
>
>
>
> On Tue, Jun 6, 2017 at 8:13 AM, Josh <jo...@gmail.com> wrote:
>
>> Hey Lukasz,
>>
>> I have a follow up question about this -
>>
>> What if I want to do something very similar, but instead of with 4
>> instances of AvroIO following the partition transform, I want 4 instances
>> of a DoFn that I've written. I want to ensure that each partition is
>> processed by a single DoFn instance/thread. Is this possible with Beam?
>>
>> Thanks,
>> Josh
>>
>>
>>
>> On Wed, May 24, 2017 at 6:15 PM, Josh <jo...@gmail.com> wrote:
>>
>>> Ahh I see - Ok I'll try out this solution then. Thanks Lukasz!
>>>
>>> On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> Google Cloud Dataflow won't override your setting. The dynamic sharding
>>>> occurs if you don't explicitly set a numShard value.
>>>>
>>>> On Wed, May 24, 2017 at 9:14 AM, Josh <jo...@gmail.com> wrote:
>>>>
>>>>> Hi Lukasz,
>>>>>
>>>>> Thanks for the example. That sounds like a nice solution -
>>>>> I am running on Dataflow though, which dynamically sets numShards - so
>>>>> if I set numShards to 1 on each of those AvroIO writers, I can't be sure
>>>>> that Dataflow isn't going to override my setting right? I guess this should
>>>>> work fine as long as I partition my stream into a large enough number of
>>>>> partitions so that Dataflow won't override numShards.
>>>>>
>>>>> Josh
>>>>>
>>>>>
>>>>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> Since your using a small number of shards, add a Partition transform
>>>>>> which uses a deterministic hash of the key to choose one of 4 partitions.
>>>>>> Write each partition with a single shard.
>>>>>>
>>>>>> (Fixed width diagram below)
>>>>>> Pipeline -> AvroIO(numShards = 4)
>>>>>> Becomes:
>>>>>> Pipeline -> Partition --> AvroIO(numShards = 1)
>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>                       \-> AvroIO(numShards = 1)
>>>>>>
>>>>>> On Wed, May 24, 2017 at 1:05 AM, Josh <jo...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am using a FileBasedSink (AvroIO.write) on an unbounded stream
>>>>>>> (withWindowedWrites, hourly windows, numShards=4).
>>>>>>>
>>>>>>> I would like to partition the stream by some key in the element, so
>>>>>>> that all elements with the same key will get processed by the same shard
>>>>>>> writer, and therefore written to the same file. Is there a way to do this?
>>>>>>> Note that in my stream the number of keys is very large (most elements have
>>>>>>> a unique key, while a few elements share a key).
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Josh
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: How to partition a stream by key before writing with FileBasedSink?

Posted by Lukasz Cwik <lc...@google.com>.
I think this is what your asking for but your statement about 4 instances
is unclear as to whether that is 4 copies of the same DoFn or 4 completely
different DoFns. Also its unclear what you mean by instance/thread, I'm
assuming that you want at most 4 instances of a DoFn each being processed
by a single thread.

This is a bad idea because you limit your parallelism but this is similar
to what the default file sharding logic does. In Apache Beam the smallest
unit of output for a GroupByKey is a single key+iterable pair. We exploit
this by assigning all our values to a fixed number of keys and then
performing a GroupByKey. This is the same trick that powers the file
sharding logic in AvroIO/TextIO/...

Your pipeline would look like (fixed width font diagram):
your data      -> apply shard key       -> GroupByKey        -> partition
by key -> your dofn #1

     \> your dofn #2

     \> ...
a  / b / c / d -> 1,a / 2,b / 1,c / 2,d -> 1,[a,c] / 2,[b,d] -> ???

This is not exactly the same as processing a single DoFn instance/thread
because it relies on the Runner to be able to schedule each key to be
processed on a different machine. For example a Runner may choose to
process value 1,[a,c] and 2,[b,d] sequentially on the same machine or may
choose to distribute them.



On Tue, Jun 6, 2017 at 8:13 AM, Josh <jo...@gmail.com> wrote:

> Hey Lukasz,
>
> I have a follow up question about this -
>
> What if I want to do something very similar, but instead of with 4
> instances of AvroIO following the partition transform, I want 4 instances
> of a DoFn that I've written. I want to ensure that each partition is
> processed by a single DoFn instance/thread. Is this possible with Beam?
>
> Thanks,
> Josh
>
>
>
> On Wed, May 24, 2017 at 6:15 PM, Josh <jo...@gmail.com> wrote:
>
>> Ahh I see - Ok I'll try out this solution then. Thanks Lukasz!
>>
>> On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik <lc...@google.com> wrote:
>>
>>> Google Cloud Dataflow won't override your setting. The dynamic sharding
>>> occurs if you don't explicitly set a numShard value.
>>>
>>> On Wed, May 24, 2017 at 9:14 AM, Josh <jo...@gmail.com> wrote:
>>>
>>>> Hi Lukasz,
>>>>
>>>> Thanks for the example. That sounds like a nice solution -
>>>> I am running on Dataflow though, which dynamically sets numShards - so
>>>> if I set numShards to 1 on each of those AvroIO writers, I can't be sure
>>>> that Dataflow isn't going to override my setting right? I guess this should
>>>> work fine as long as I partition my stream into a large enough number of
>>>> partitions so that Dataflow won't override numShards.
>>>>
>>>> Josh
>>>>
>>>>
>>>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> Since your using a small number of shards, add a Partition transform
>>>>> which uses a deterministic hash of the key to choose one of 4 partitions.
>>>>> Write each partition with a single shard.
>>>>>
>>>>> (Fixed width diagram below)
>>>>> Pipeline -> AvroIO(numShards = 4)
>>>>> Becomes:
>>>>> Pipeline -> Partition --> AvroIO(numShards = 1)
>>>>>                       |-> AvroIO(numShards = 1)
>>>>>                       |-> AvroIO(numShards = 1)
>>>>>                       \-> AvroIO(numShards = 1)
>>>>>
>>>>> On Wed, May 24, 2017 at 1:05 AM, Josh <jo...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am using a FileBasedSink (AvroIO.write) on an unbounded stream
>>>>>> (withWindowedWrites, hourly windows, numShards=4).
>>>>>>
>>>>>> I would like to partition the stream by some key in the element, so
>>>>>> that all elements with the same key will get processed by the same shard
>>>>>> writer, and therefore written to the same file. Is there a way to do this?
>>>>>> Note that in my stream the number of keys is very large (most elements have
>>>>>> a unique key, while a few elements share a key).
>>>>>>
>>>>>> Thanks,
>>>>>> Josh
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: How to partition a stream by key before writing with FileBasedSink?

Posted by Josh <jo...@gmail.com>.
Hey Lukasz,

I have a follow up question about this -

What if I want to do something very similar, but instead of with 4
instances of AvroIO following the partition transform, I want 4 instances
of a DoFn that I've written. I want to ensure that each partition is
processed by a single DoFn instance/thread. Is this possible with Beam?

Thanks,
Josh



On Wed, May 24, 2017 at 6:15 PM, Josh <jo...@gmail.com> wrote:

> Ahh I see - Ok I'll try out this solution then. Thanks Lukasz!
>
> On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik <lc...@google.com> wrote:
>
>> Google Cloud Dataflow won't override your setting. The dynamic sharding
>> occurs if you don't explicitly set a numShard value.
>>
>> On Wed, May 24, 2017 at 9:14 AM, Josh <jo...@gmail.com> wrote:
>>
>>> Hi Lukasz,
>>>
>>> Thanks for the example. That sounds like a nice solution -
>>> I am running on Dataflow though, which dynamically sets numShards - so
>>> if I set numShards to 1 on each of those AvroIO writers, I can't be sure
>>> that Dataflow isn't going to override my setting right? I guess this should
>>> work fine as long as I partition my stream into a large enough number of
>>> partitions so that Dataflow won't override numShards.
>>>
>>> Josh
>>>
>>>
>>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> Since your using a small number of shards, add a Partition transform
>>>> which uses a deterministic hash of the key to choose one of 4 partitions.
>>>> Write each partition with a single shard.
>>>>
>>>> (Fixed width diagram below)
>>>> Pipeline -> AvroIO(numShards = 4)
>>>> Becomes:
>>>> Pipeline -> Partition --> AvroIO(numShards = 1)
>>>>                       |-> AvroIO(numShards = 1)
>>>>                       |-> AvroIO(numShards = 1)
>>>>                       \-> AvroIO(numShards = 1)
>>>>
>>>> On Wed, May 24, 2017 at 1:05 AM, Josh <jo...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am using a FileBasedSink (AvroIO.write) on an unbounded stream
>>>>> (withWindowedWrites, hourly windows, numShards=4).
>>>>>
>>>>> I would like to partition the stream by some key in the element, so
>>>>> that all elements with the same key will get processed by the same shard
>>>>> writer, and therefore written to the same file. Is there a way to do this?
>>>>> Note that in my stream the number of keys is very large (most elements have
>>>>> a unique key, while a few elements share a key).
>>>>>
>>>>> Thanks,
>>>>> Josh
>>>>>
>>>>
>>>>
>>>
>>
>

Re: How to partition a stream by key before writing with FileBasedSink?

Posted by Josh <jo...@gmail.com>.
Ahh I see - Ok I'll try out this solution then. Thanks Lukasz!

On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik <lc...@google.com> wrote:

> Google Cloud Dataflow won't override your setting. The dynamic sharding
> occurs if you don't explicitly set a numShard value.
>
> On Wed, May 24, 2017 at 9:14 AM, Josh <jo...@gmail.com> wrote:
>
>> Hi Lukasz,
>>
>> Thanks for the example. That sounds like a nice solution -
>> I am running on Dataflow though, which dynamically sets numShards - so if
>> I set numShards to 1 on each of those AvroIO writers, I can't be sure that
>> Dataflow isn't going to override my setting right? I guess this should work
>> fine as long as I partition my stream into a large enough number of
>> partitions so that Dataflow won't override numShards.
>>
>> Josh
>>
>>
>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <lc...@google.com> wrote:
>>
>>> Since your using a small number of shards, add a Partition transform
>>> which uses a deterministic hash of the key to choose one of 4 partitions.
>>> Write each partition with a single shard.
>>>
>>> (Fixed width diagram below)
>>> Pipeline -> AvroIO(numShards = 4)
>>> Becomes:
>>> Pipeline -> Partition --> AvroIO(numShards = 1)
>>>                       |-> AvroIO(numShards = 1)
>>>                       |-> AvroIO(numShards = 1)
>>>                       \-> AvroIO(numShards = 1)
>>>
>>> On Wed, May 24, 2017 at 1:05 AM, Josh <jo...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am using a FileBasedSink (AvroIO.write) on an unbounded stream
>>>> (withWindowedWrites, hourly windows, numShards=4).
>>>>
>>>> I would like to partition the stream by some key in the element, so
>>>> that all elements with the same key will get processed by the same shard
>>>> writer, and therefore written to the same file. Is there a way to do this?
>>>> Note that in my stream the number of keys is very large (most elements have
>>>> a unique key, while a few elements share a key).
>>>>
>>>> Thanks,
>>>> Josh
>>>>
>>>
>>>
>>
>

Re: How to partition a stream by key before writing with FileBasedSink?

Posted by Lukasz Cwik <lc...@google.com>.
Google Cloud Dataflow won't override your setting. The dynamic sharding
occurs if you don't explicitly set a numShard value.

On Wed, May 24, 2017 at 9:14 AM, Josh <jo...@gmail.com> wrote:

> Hi Lukasz,
>
> Thanks for the example. That sounds like a nice solution -
> I am running on Dataflow though, which dynamically sets numShards - so if
> I set numShards to 1 on each of those AvroIO writers, I can't be sure that
> Dataflow isn't going to override my setting right? I guess this should work
> fine as long as I partition my stream into a large enough number of
> partitions so that Dataflow won't override numShards.
>
> Josh
>
>
> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <lc...@google.com> wrote:
>
>> Since your using a small number of shards, add a Partition transform
>> which uses a deterministic hash of the key to choose one of 4 partitions.
>> Write each partition with a single shard.
>>
>> (Fixed width diagram below)
>> Pipeline -> AvroIO(numShards = 4)
>> Becomes:
>> Pipeline -> Partition --> AvroIO(numShards = 1)
>>                       |-> AvroIO(numShards = 1)
>>                       |-> AvroIO(numShards = 1)
>>                       \-> AvroIO(numShards = 1)
>>
>> On Wed, May 24, 2017 at 1:05 AM, Josh <jo...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am using a FileBasedSink (AvroIO.write) on an unbounded stream
>>> (withWindowedWrites, hourly windows, numShards=4).
>>>
>>> I would like to partition the stream by some key in the element, so that
>>> all elements with the same key will get processed by the same shard writer,
>>> and therefore written to the same file. Is there a way to do this? Note
>>> that in my stream the number of keys is very large (most elements have a
>>> unique key, while a few elements share a key).
>>>
>>> Thanks,
>>> Josh
>>>
>>
>>
>

Re: How to partition a stream by key before writing with FileBasedSink?

Posted by Josh <jo...@gmail.com>.
Hi Lukasz,

Thanks for the example. That sounds like a nice solution -
I am running on Dataflow though, which dynamically sets numShards - so if I
set numShards to 1 on each of those AvroIO writers, I can't be sure that
Dataflow isn't going to override my setting right? I guess this should work
fine as long as I partition my stream into a large enough number of
partitions so that Dataflow won't override numShards.

Josh

On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <lc...@google.com> wrote:

> Since your using a small number of shards, add a Partition transform which
> uses a deterministic hash of the key to choose one of 4 partitions. Write
> each partition with a single shard.
>
> (Fixed width diagram below)
> Pipeline -> AvroIO(numShards = 4)
> Becomes:
> Pipeline -> Partition --> AvroIO(numShards = 1)
>                       |-> AvroIO(numShards = 1)
>                       |-> AvroIO(numShards = 1)
>                       \-> AvroIO(numShards = 1)
>
> On Wed, May 24, 2017 at 1:05 AM, Josh <jo...@gmail.com> wrote:
>
>> Hi,
>>
>> I am using a FileBasedSink (AvroIO.write) on an unbounded stream
>> (withWindowedWrites, hourly windows, numShards=4).
>>
>> I would like to partition the stream by some key in the element, so that
>> all elements with the same key will get processed by the same shard writer,
>> and therefore written to the same file. Is there a way to do this? Note
>> that in my stream the number of keys is very large (most elements have a
>> unique key, while a few elements share a key).
>>
>> Thanks,
>> Josh
>>
>
>

Re: How to partition a stream by key before writing with FileBasedSink?

Posted by Lukasz Cwik <lc...@google.com>.
Since your using a small number of shards, add a Partition transform which
uses a deterministic hash of the key to choose one of 4 partitions. Write
each partition with a single shard.

(Fixed width diagram below)
Pipeline -> AvroIO(numShards = 4)
Becomes:
Pipeline -> Partition --> AvroIO(numShards = 1)
                      |-> AvroIO(numShards = 1)
                      |-> AvroIO(numShards = 1)
                      \-> AvroIO(numShards = 1)

On Wed, May 24, 2017 at 1:05 AM, Josh <jo...@gmail.com> wrote:

> Hi,
>
> I am using a FileBasedSink (AvroIO.write) on an unbounded stream
> (withWindowedWrites, hourly windows, numShards=4).
>
> I would like to partition the stream by some key in the element, so that
> all elements with the same key will get processed by the same shard writer,
> and therefore written to the same file. Is there a way to do this? Note
> that in my stream the number of keys is very large (most elements have a
> unique key, while a few elements share a key).
>
> Thanks,
> Josh
>