You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by "York, Brennon" <Br...@capitalone.com> on 2015/08/27 19:59:48 UTC

Thread Local and Container Local

1. Does anyone have any sample code (or can point me to some) where we demonstrate thread local and container local operators? Answered my own question, check this out<https://github.com/apache/incubator-apex-core/blob/bdd7109519453e67789e4ec4025092a977d2b27c/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java#L64>.
2. When doing thread or container local streams how does that work with dynamic (or differing sized) partitions between the two operators? Concretely, if I have a logical plan that looks like:

X[1] => Y[1]

And the physical plan looks like:

X[2] => Y[16]

How does the grouping work? Would it put 1 physical X operator and 8 physical Y operators in one grouping and the other set of physical operators in another grouping or does it do something else? And, for edification, where does the Apex code reside that does this work?

Thanks all!
________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

Re: Thread Local and Container Local

Posted by Amol Kekre <am...@datatorrent.com>.
Brennon,
Stream locality works well when there is 1:1 connection on physical plan.
For example

Logical X[1] => Y[1] => Z[1]
could be physically done as
X[2] => Y[16] => Z[16]
or
X[16] => Y[16] => Z[16]

If Z can work on same subset of Y (i.e. does not need another shuffle) Y
and Z can leverage thread, container, node locality and so forth. Node
locality that be used if resources have to be distributed and not rely of
Yarn being able to give a container that gets multiple cores or very large
memory size. On a dedicated cluster a bigger container makes more sense
than node local stream as there is no competition. Putting X => Y => Z into
a single container makes sense if you want to avoid I/O cost. But then may
as well try for X[P] => Y[P] => Z[P].

Based on resource requirements vis-a-vis container size the partitions are
determined by the bottlenecked operator (which operator X/Y/Z, which
resource, RAM/CPU/IO). In a lot of cases these are internal operators (Non
input adapters). With dedicated cluster, this equation changes a lot.

With respect to getting X->Y->Z in parallel partitiong, the first operator
(input adapter) will dictate if there will be a shuffle. If the input data
is load balanced (not key balanced), and needs same key to go to a
downstream physical partition, then a shuffle is unavoidable. If two events
on the same key can be processed by diff partitions, you may be able to
have an entire app in parallel partition as X[P] => Y[P] => Z[P], where P
is the number of partitions. Each partition then can be stream local. If
not X[P1] => Y[P2] => Z[P2] is the way out.

Thks,
Amol


On Thu, Aug 27, 2015 at 10:59 AM, York, Brennon <Brennon.York@capitalone.com
> wrote:

> 1. Does anyone have any sample code (or can point me to some) where we
> demonstrate thread local and container local operators? Answered my own
> question, check this out<
> https://github.com/apache/incubator-apex-core/blob/bdd7109519453e67789e4ec4025092a977d2b27c/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java#L64
> >.
> 2. When doing thread or container local streams how does that work with
> dynamic (or differing sized) partitions between the two operators?
> Concretely, if I have a logical plan that looks like:
>
> X[1] => Y[1]
>
> And the physical plan looks like:
>
> X[2] => Y[16]
>
> How does the grouping work? Would it put 1 physical X operator and 8
> physical Y operators in one grouping and the other set of physical
> operators in another grouping or does it do something else? And, for
> edification, where does the Apex code reside that does this work?
>
> Thanks all!
> ________________________________________________________
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>

Re: Thread Local and Container Local

Posted by "York, Brennon" <Br...@capitalone.com>.
This is great stuff guys, thanks! Do we have the Apache Apex JIRA migrated
over yet? I¹d like to get this into a JIRA as we look at updating the
documentation / provide use cases for things like this.

On 8/27/15, 2:20 PM, "Chetan Narsude" <ch...@datatorrent.com> wrote:

>Hi Brennon,
>
>  To speak in terms of concrete example you gave (thanks for that):
>
>Broadly speaking, the reasons for partitioning can be:
>
>1. You want to use more resources to execute your business logic (divide
>and conquer).
>2. You want to simplify your logic by segregating the dataset.
>
>
>Although 2 is a valid reason, it can easily be achieved by providing some
>boilerplate code in your business logic. So we will mostly focus on reason
>1.
>
>So when X needs to be partitioned, you are explicitly stating that if
>possible you need different resources for each of the partitions. The same
>is true for Y. So Apex will try to allocate separate containers for each
>of
>those instances.
>
>So the idea of Y instance being container/thread local to X has no
>possibility at the logical level.
>
>However when X gets partitioned 2 ways, each of the Y partition now needs
>unifier which unifies 2 streams, one coming from each of X instances. This
>unifier though now can be in the same container or the thread as instance
>of Y. And that's the resolution Apex resorts to.
>
>TLDR; the locality is applied to the stream between the unifier and the
>downstream instance.
>
>
>com.datatorrent.stram.plan.physical.PhysicalPlan.class has the code -
>specifically follow the trails from setLocalityGrouping method.
>
>--
>Chetan
>
>On Thu, Aug 27, 2015 at 10:59 AM, York, Brennon
><Brennon.York@capitalone.com
>> wrote:
>
>> 1. Does anyone have any sample code (or can point me to some) where we
>> demonstrate thread local and container local operators? Answered my own
>> question, check this out<
>> 
>>https://github.com/apache/incubator-apex-core/blob/bdd7109519453e67789e4e
>>c4025092a977d2b27c/engine/src/test/java/com/datatorrent/stram/stream/OiOS
>>treamTest.java#L64
>> >.
>> 2. When doing thread or container local streams how does that work with
>> dynamic (or differing sized) partitions between the two operators?
>> Concretely, if I have a logical plan that looks like:
>>
>> X[1] => Y[1]
>>
>> And the physical plan looks like:
>>
>> X[2] => Y[16]
>>
>> How does the grouping work? Would it put 1 physical X operator and 8
>> physical Y operators in one grouping and the other set of physical
>> operators in another grouping or does it do something else? And, for
>> edification, where does the Apex code reside that does this work?
>>
>> Thanks all!
>> ________________________________________________________
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The
>>information
>> transmitted herewith is intended only for use by the individual or
>>entity
>> to which it is addressed. If the reader of this message is not the
>>intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>

________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.


Re: Thread Local and Container Local

Posted by Chetan Narsude <ch...@datatorrent.com>.
Hi Brennon,

  To speak in terms of concrete example you gave (thanks for that):

Broadly speaking, the reasons for partitioning can be:

1. You want to use more resources to execute your business logic (divide
and conquer).
2. You want to simplify your logic by segregating the dataset.


Although 2 is a valid reason, it can easily be achieved by providing some
boilerplate code in your business logic. So we will mostly focus on reason
1.

So when X needs to be partitioned, you are explicitly stating that if
possible you need different resources for each of the partitions. The same
is true for Y. So Apex will try to allocate separate containers for each of
those instances.

So the idea of Y instance being container/thread local to X has no
possibility at the logical level.

However when X gets partitioned 2 ways, each of the Y partition now needs
unifier which unifies 2 streams, one coming from each of X instances. This
unifier though now can be in the same container or the thread as instance
of Y. And that's the resolution Apex resorts to.

TLDR; the locality is applied to the stream between the unifier and the
downstream instance.


com.datatorrent.stram.plan.physical.PhysicalPlan.class has the code -
specifically follow the trails from setLocalityGrouping method.

--
Chetan

On Thu, Aug 27, 2015 at 10:59 AM, York, Brennon <Brennon.York@capitalone.com
> wrote:

> 1. Does anyone have any sample code (or can point me to some) where we
> demonstrate thread local and container local operators? Answered my own
> question, check this out<
> https://github.com/apache/incubator-apex-core/blob/bdd7109519453e67789e4ec4025092a977d2b27c/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java#L64
> >.
> 2. When doing thread or container local streams how does that work with
> dynamic (or differing sized) partitions between the two operators?
> Concretely, if I have a logical plan that looks like:
>
> X[1] => Y[1]
>
> And the physical plan looks like:
>
> X[2] => Y[16]
>
> How does the grouping work? Would it put 1 physical X operator and 8
> physical Y operators in one grouping and the other set of physical
> operators in another grouping or does it do something else? And, for
> edification, where does the Apex code reside that does this work?
>
> Thanks all!
> ________________________________________________________
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>