You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by "Pramod Immaneni (JIRA)" <ji...@apache.org> on 2016/02/17 21:53:18 UTC

[jira] [Commented] (APEXCORE-348) Load based stream partitioning

    [ https://issues.apache.org/jira/browse/APEXCORE-348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15151156#comment-15151156 ] 

Pramod Immaneni commented on APEXCORE-348:
------------------------------------------

Conversation from email

Pramod Immaneni <pr...@datatorrent.com>
Inline

On Thu, Feb 11, 2016 at 4:32 PM, Timothy Farkas <ti...@datatorrent.com> wrote:
Comments inline

+1 Overall as well provided Apex-339 is implemented first and it is
documented that the mechanism should not be used with some stateful
operators.

On Thu, Feb 11, 2016 at 4:20 PM, Pramod Immaneni <pr...@datatorrent.com>
wrote:

> Comments inline
>
> On Thu, Feb 11, 2016 at 4:13 PM, Timothy Farkas <ti...@datatorrent.com>
> wrote:
>
> > Hey Pramod,
> >
> > I agree if APEX-339 is in place then it would work without redeploying
> > containers for operators that are Stateless, or a subset of Stateful
> > operators.
> >
> > Addressing your previous questions.
> >
> > - The StatsListener can be used to see how far behind operators are. You
> > could determine what window the operator is on, or the number of tuples
> > it's processed so far, or how long
> > it takes it to complete a window.
> >
>
> What if tuples are different sizes and number of tuples processed doesn't
> reflect how far ahead or behind a downstream partition is? How is the
> information from StatsListener made available to the upstream partition
> codecs.
>
What is the information Buffer Server can provide that the StatsListener
cannot?

The stats information would have to be relayed down to the upstream operators. It's possible.
 

The StatsListener can trigger a repartition. The information in the
StatsListener can be shared
with the partitioner by setting the same object for both in populate Dag.
The partitioner can then
compute the new Stream Codec. The mechanism by which the upstream would be
updated with the new
Stream Codec would have to be implemented as it's currently not there.

>
>
> >
> > - Some examples of Stateful operators that require repartitioning of
> state
> > are the following:
> >       - Deduper
> >            In this case after updating the stream codec the operator may
> > allow a previously seen value to pass because the partition didn't
> receive
> > that value with the previous stream codec.
> >       - A key value store that holds aggregations for each key.
> >            In this case multiple partitions would hold partial
> aggregations
> > for a key, when they are expecting to hold the complete aggregation.
> >
>
> Agreed for deduper. For the second case a unifier is a better approach so
> that you are not affected by key skew in general.
>
This is not always possible. We can discuss this offline, since it won't
add much to the discussion here to go into the details.


Yes not always.


> Load based stream partitioning
> ------------------------------
>
>                 Key: APEXCORE-348
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-348
>             Project: Apache Apex Core
>          Issue Type: Improvement
>            Reporter: Pramod Immaneni
>            Assignee: Pramod Immaneni
>
> There are scenarios where the downstream partitions of an upstream operator are generally not performing uniformly resulting in an overall sub-optimal performance dictated by the slowest partitions. The reasons could be data related such as some partitions are receiving more data to process than the others or could be environment related such as some partitions are running slower than others because they are on heavily loaded nodes.
> A solution based on currently available functionality in the engine would be to write a StreamCodec implementation to distribute data among the partitions such that each partition is receiving similar amount of data to process. We should consider adding StreamCodecs like these to the library but these however do not solve the problem when it is environment related.
> For that a better and more comprehensive approach would be look at how data is being consumed by the downstream partitions from the BufferServer and use that information to make decisions on how to send future data. If some partitions are behind others in consuming data then data can be directed to the other partitions. One way to do this would be to relay this type of statistical and positional information from BufferServer to the upstream publishers. The publishers can use this information in ways such as making it available to StreamCodecs to affect destination of future data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)