You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by "Ganelin, Ilya" <Il...@capitalone.com> on 2015/09/23 17:06:17 UTC

Support easy parallelization of pipelines

Hi all. Say that I have a DAG (A -> B -> C) and, given any DAG, I want to create a new DAG that is (A -> B -> C,  A -> B’ -> C’)

At the moment, Apex does not support the following operation:

dag.addStream(“NAME”, A.output, B.input)
dag.addStream(“NAME2”, A.output, B’.input)

It throws the following error:
java.lang.IllegalArgumentException: Operator A already connected to NAME

One has to do :
dag.addStream(“name”, A.output,  B.input, B’.input).

Is there a concrete reason that the first approach is not supported? Is there any way to enable such an approach to allow iterative building of graphs? This would allow me to create a utility that automatically generates independent parallel pipelines given the same point of origin which greatly facilitates low-latency fault handling.

Is there a reason an operator may not be connected to multiple operators after initialization?

Appreciate any input, thanks!
________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

Re: Support easy parallelization of pipelines

Posted by Timothy Farkas <ti...@datatorrent.com>.
Hi Ilya

I am not sure what you mean by pass only the "first" tuple through. Can you
explain?

A good place to start looking at setting partitioning keys to control
whether the data sent to partitions is replicated is here.
https://github.com/apache/incubator-apex-core/blob/devel-3/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java

It's been some time since I've worked with this myself so it would take me
some time to create an example. If you can wait a few days I'll create one
when I get the chance, otherwise maybe someone else on the dev list can
help out.

Thanks,
Tim

On Thu, Sep 24, 2015 at 1:51 PM, Ganelin, Ilya <Il...@capitalone.com>
wrote:

> Timothy - is there any chance you could share a code snippet or point to
> an example showing how to do this partitioning and comment on whether
> there’s an identifier we could use to identify and only pass along the
> “first” tuple through?
>
> Thanks in advance.
> On 9/23/15, 12:08 PM, "Timothy Farkas" <ti...@datatorrent.com> wrote:
>
> >That's a good point. Using the dag.addStream(“NAME”, A.output, B.input,
> >C.input); approach the B and C pipelines would be independent, so if B
> >failed C would be unaffected. But If A failed both pipelines would be
> >affected. In the case of setting a custom partitioner the same applies. A
> >failure in one of the B pipelines would leave the other pipelines
> >unaffected, but a failure in A would affect all the pipelines.
> >
> >Tim
> >
> >On Wed, Sep 23, 2015 at 7:54 AM, Ganelin, Ilya
> ><Il...@capitalone.com>
> >wrote:
> >
> >> Thanks, Tim - I think I understand. You are proposing to do things at
> >>the
> >> operator level, rather than at the DAG level.
> >>
> >> B could then have 6 parallel partitions, some of which process the same
> >> data simultaneously.
> >> X1
> >> X1
> >> X2
> >> X2
> >> X3
> >> X3
> >>
> >> In this scenario, is it possible for the OPERATOR to fail terminating
> >>all
> >> the pipelines or is it only possible for an individual physical pipeline
> >> to fail - therefore not affecting the others?
> >>
> >> On 9/23/15, 11:48 AM, "Timothy Farkas" <ti...@datatorrent.com> wrote:
> >>
> >> >Hi Ilya,
> >> >
> >> >There are two ways to do this. You can do normal N x N partitioning,
> >>but
> >> >write a partitioner for B which assigns the same partition keys to M
> >> >operators in a partitioning. Then M partitions will receive the same
> >>data:
> >> >An example of setting partition keys in a partitioner is here:
> >> >
> >> >
> >>
> >>
> https://github.com/apache/incubator-apex-core/blob/devel-3/common/src/mai
> >>n
> >> >/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
> >> >
> >> >An easier way is this:
> >> >
> >> >dag.addStream(“NAME”, A.output, B.input, C.input);
> >> >
> >> >You have to put all the input ports you want to receive the same data
> >>in
> >> >the same stream declaration.
> >> >
> >> >Tim
> >> >
> >> >On Wed, Sep 23, 2015 at 7:39 AM, Ganelin, Ilya
> >> ><Il...@capitalone.com>
> >> >wrote:
> >> >
> >> >> Ram, thank you. I think this is a good starting point, however it
> >> >>requires
> >> >> having access to the stream at creation time (as well as the operator
> >> >> being added). I¹d ideally like to create a function:
> >> >>
> >> >> static void parallelize(DAG dag);
> >> >>
> >> >> This function would take the head of the DAG, and parallelize all
> >> >> downstream operators. It looks like at the moment, there is no
> >>interface
> >> >> within DAG to provide access to its operators or streams. Does such
> >>an
> >> >> interface exist or is this something I would need to expose? Was
> >>there a
> >> >> design decision to not expose these?
> >> >>
> >> >>
> >> >> On 9/23/15, 11:26 AM, "Munagala Ramanath" <ra...@datatorrent.com>
> >>wrote:
> >> >>
> >> >> >l
> >> >>
> >> >> ________________________________________________________
> >> >>
> >> >> The information contained in this e-mail is confidential and/or
> >> >> proprietary to Capital One and/or its affiliates and may only be used
> >> >> solely in performance of work or services for Capital One. The
> >> >>information
> >> >> transmitted herewith is intended only for use by the individual or
> >> >>entity
> >> >> to which it is addressed. If the reader of this message is not the
> >> >>intended
> >> >> recipient, you are hereby notified that any review, retransmission,
> >> >> dissemination, distribution, copying or other use of, or taking of
> >>any
> >> >> action in reliance upon this information is strictly prohibited. If
> >>you
> >> >> have received this communication in error, please contact the sender
> >>and
> >> >> delete the material from your computer.
> >> >>
> >> >> ________________________________________________________
> >> >>
> >> >> The information contained in this e-mail is confidential and/or
> >> >> proprietary to Capital One and/or its affiliates and may only be used
> >> >> solely in performance of work or services for Capital One. The
> >> >>information
> >> >> transmitted herewith is intended only for use by the individual or
> >> >>entity
> >> >> to which it is addressed. If the reader of this message is not the
> >> >>intended
> >> >> recipient, you are hereby notified that any review, retransmission,
> >> >> dissemination, distribution, copying or other use of, or taking of
> >>any
> >> >> action in reliance upon this information is strictly prohibited. If
> >>you
> >> >> have received this communication in error, please contact the sender
> >>and
> >> >> delete the material from your computer.
> >> >>
> >> >>
> >>
> >> ________________________________________________________
> >>
> >> The information contained in this e-mail is confidential and/or
> >> proprietary to Capital One and/or its affiliates and may only be used
> >> solely in performance of work or services for Capital One. The
> >>information
> >> transmitted herewith is intended only for use by the individual or
> >>entity
> >> to which it is addressed. If the reader of this message is not the
> >>intended
> >> recipient, you are hereby notified that any review, retransmission,
> >> dissemination, distribution, copying or other use of, or taking of any
> >> action in reliance upon this information is strictly prohibited. If you
> >> have received this communication in error, please contact the sender and
> >> delete the material from your computer.
> >>
>
> ________________________________________________________
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>

Re: Support easy parallelization of pipelines

Posted by "Ganelin, Ilya" <Il...@capitalone.com>.
Timothy - is there any chance you could share a code snippet or point to
an example showing how to do this partitioning and comment on whether
there’s an identifier we could use to identify and only pass along the
“first” tuple through?

Thanks in advance.
On 9/23/15, 12:08 PM, "Timothy Farkas" <ti...@datatorrent.com> wrote:

>That's a good point. Using the dag.addStream(“NAME”, A.output, B.input,
>C.input); approach the B and C pipelines would be independent, so if B
>failed C would be unaffected. But If A failed both pipelines would be
>affected. In the case of setting a custom partitioner the same applies. A
>failure in one of the B pipelines would leave the other pipelines
>unaffected, but a failure in A would affect all the pipelines.
>
>Tim
>
>On Wed, Sep 23, 2015 at 7:54 AM, Ganelin, Ilya
><Il...@capitalone.com>
>wrote:
>
>> Thanks, Tim - I think I understand. You are proposing to do things at
>>the
>> operator level, rather than at the DAG level.
>>
>> B could then have 6 parallel partitions, some of which process the same
>> data simultaneously.
>> X1
>> X1
>> X2
>> X2
>> X3
>> X3
>>
>> In this scenario, is it possible for the OPERATOR to fail terminating
>>all
>> the pipelines or is it only possible for an individual physical pipeline
>> to fail - therefore not affecting the others?
>>
>> On 9/23/15, 11:48 AM, "Timothy Farkas" <ti...@datatorrent.com> wrote:
>>
>> >Hi Ilya,
>> >
>> >There are two ways to do this. You can do normal N x N partitioning,
>>but
>> >write a partitioner for B which assigns the same partition keys to M
>> >operators in a partitioning. Then M partitions will receive the same
>>data:
>> >An example of setting partition keys in a partitioner is here:
>> >
>> >
>> 
>>https://github.com/apache/incubator-apex-core/blob/devel-3/common/src/mai
>>n
>> >/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
>> >
>> >An easier way is this:
>> >
>> >dag.addStream(“NAME”, A.output, B.input, C.input);
>> >
>> >You have to put all the input ports you want to receive the same data
>>in
>> >the same stream declaration.
>> >
>> >Tim
>> >
>> >On Wed, Sep 23, 2015 at 7:39 AM, Ganelin, Ilya
>> ><Il...@capitalone.com>
>> >wrote:
>> >
>> >> Ram, thank you. I think this is a good starting point, however it
>> >>requires
>> >> having access to the stream at creation time (as well as the operator
>> >> being added). I¹d ideally like to create a function:
>> >>
>> >> static void parallelize(DAG dag);
>> >>
>> >> This function would take the head of the DAG, and parallelize all
>> >> downstream operators. It looks like at the moment, there is no
>>interface
>> >> within DAG to provide access to its operators or streams. Does such
>>an
>> >> interface exist or is this something I would need to expose? Was
>>there a
>> >> design decision to not expose these?
>> >>
>> >>
>> >> On 9/23/15, 11:26 AM, "Munagala Ramanath" <ra...@datatorrent.com>
>>wrote:
>> >>
>> >> >l
>> >>
>> >> ________________________________________________________
>> >>
>> >> The information contained in this e-mail is confidential and/or
>> >> proprietary to Capital One and/or its affiliates and may only be used
>> >> solely in performance of work or services for Capital One. The
>> >>information
>> >> transmitted herewith is intended only for use by the individual or
>> >>entity
>> >> to which it is addressed. If the reader of this message is not the
>> >>intended
>> >> recipient, you are hereby notified that any review, retransmission,
>> >> dissemination, distribution, copying or other use of, or taking of
>>any
>> >> action in reliance upon this information is strictly prohibited. If
>>you
>> >> have received this communication in error, please contact the sender
>>and
>> >> delete the material from your computer.
>> >>
>> >> ________________________________________________________
>> >>
>> >> The information contained in this e-mail is confidential and/or
>> >> proprietary to Capital One and/or its affiliates and may only be used
>> >> solely in performance of work or services for Capital One. The
>> >>information
>> >> transmitted herewith is intended only for use by the individual or
>> >>entity
>> >> to which it is addressed. If the reader of this message is not the
>> >>intended
>> >> recipient, you are hereby notified that any review, retransmission,
>> >> dissemination, distribution, copying or other use of, or taking of
>>any
>> >> action in reliance upon this information is strictly prohibited. If
>>you
>> >> have received this communication in error, please contact the sender
>>and
>> >> delete the material from your computer.
>> >>
>> >>
>>
>> ________________________________________________________
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The
>>information
>> transmitted herewith is intended only for use by the individual or
>>entity
>> to which it is addressed. If the reader of this message is not the
>>intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>

________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

Re: Support easy parallelization of pipelines

Posted by Timothy Farkas <ti...@datatorrent.com>.
That's a good point. Using the dag.addStream(“NAME”, A.output, B.input,
C.input); approach the B and C pipelines would be independent, so if B
failed C would be unaffected. But If A failed both pipelines would be
affected. In the case of setting a custom partitioner the same applies. A
failure in one of the B pipelines would leave the other pipelines
unaffected, but a failure in A would affect all the pipelines.

Tim

On Wed, Sep 23, 2015 at 7:54 AM, Ganelin, Ilya <Il...@capitalone.com>
wrote:

> Thanks, Tim - I think I understand. You are proposing to do things at the
> operator level, rather than at the DAG level.
>
> B could then have 6 parallel partitions, some of which process the same
> data simultaneously.
> X1
> X1
> X2
> X2
> X3
> X3
>
> In this scenario, is it possible for the OPERATOR to fail terminating all
> the pipelines or is it only possible for an individual physical pipeline
> to fail - therefore not affecting the others?
>
> On 9/23/15, 11:48 AM, "Timothy Farkas" <ti...@datatorrent.com> wrote:
>
> >Hi Ilya,
> >
> >There are two ways to do this. You can do normal N x N partitioning, but
> >write a partitioner for B which assigns the same partition keys to M
> >operators in a partitioning. Then M partitions will receive the same data:
> >An example of setting partition keys in a partitioner is here:
> >
> >
> https://github.com/apache/incubator-apex-core/blob/devel-3/common/src/main
> >/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
> >
> >An easier way is this:
> >
> >dag.addStream(“NAME”, A.output, B.input, C.input);
> >
> >You have to put all the input ports you want to receive the same data in
> >the same stream declaration.
> >
> >Tim
> >
> >On Wed, Sep 23, 2015 at 7:39 AM, Ganelin, Ilya
> ><Il...@capitalone.com>
> >wrote:
> >
> >> Ram, thank you. I think this is a good starting point, however it
> >>requires
> >> having access to the stream at creation time (as well as the operator
> >> being added). I¹d ideally like to create a function:
> >>
> >> static void parallelize(DAG dag);
> >>
> >> This function would take the head of the DAG, and parallelize all
> >> downstream operators. It looks like at the moment, there is no interface
> >> within DAG to provide access to its operators or streams. Does such an
> >> interface exist or is this something I would need to expose? Was there a
> >> design decision to not expose these?
> >>
> >>
> >> On 9/23/15, 11:26 AM, "Munagala Ramanath" <ra...@datatorrent.com> wrote:
> >>
> >> >l
> >>
> >> ________________________________________________________
> >>
> >> The information contained in this e-mail is confidential and/or
> >> proprietary to Capital One and/or its affiliates and may only be used
> >> solely in performance of work or services for Capital One. The
> >>information
> >> transmitted herewith is intended only for use by the individual or
> >>entity
> >> to which it is addressed. If the reader of this message is not the
> >>intended
> >> recipient, you are hereby notified that any review, retransmission,
> >> dissemination, distribution, copying or other use of, or taking of any
> >> action in reliance upon this information is strictly prohibited. If you
> >> have received this communication in error, please contact the sender and
> >> delete the material from your computer.
> >>
> >> ________________________________________________________
> >>
> >> The information contained in this e-mail is confidential and/or
> >> proprietary to Capital One and/or its affiliates and may only be used
> >> solely in performance of work or services for Capital One. The
> >>information
> >> transmitted herewith is intended only for use by the individual or
> >>entity
> >> to which it is addressed. If the reader of this message is not the
> >>intended
> >> recipient, you are hereby notified that any review, retransmission,
> >> dissemination, distribution, copying or other use of, or taking of any
> >> action in reliance upon this information is strictly prohibited. If you
> >> have received this communication in error, please contact the sender and
> >> delete the material from your computer.
> >>
> >>
>
> ________________________________________________________
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>

Re: Support easy parallelization of pipelines

Posted by "Ganelin, Ilya" <Il...@capitalone.com>.
Thanks, Tim - I think I understand. You are proposing to do things at the
operator level, rather than at the DAG level.

B could then have 6 parallel partitions, some of which process the same
data simultaneously.
X1
X1
X2
X2
X3
X3

In this scenario, is it possible for the OPERATOR to fail terminating all
the pipelines or is it only possible for an individual physical pipeline
to fail - therefore not affecting the others?

On 9/23/15, 11:48 AM, "Timothy Farkas" <ti...@datatorrent.com> wrote:

>Hi Ilya,
>
>There are two ways to do this. You can do normal N x N partitioning, but
>write a partitioner for B which assigns the same partition keys to M
>operators in a partitioning. Then M partitions will receive the same data:
>An example of setting partition keys in a partitioner is here:
>
>https://github.com/apache/incubator-apex-core/blob/devel-3/common/src/main
>/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
>
>An easier way is this:
>
>dag.addStream(“NAME”, A.output, B.input, C.input);
>
>You have to put all the input ports you want to receive the same data in
>the same stream declaration.
>
>Tim
>
>On Wed, Sep 23, 2015 at 7:39 AM, Ganelin, Ilya
><Il...@capitalone.com>
>wrote:
>
>> Ram, thank you. I think this is a good starting point, however it
>>requires
>> having access to the stream at creation time (as well as the operator
>> being added). I¹d ideally like to create a function:
>>
>> static void parallelize(DAG dag);
>>
>> This function would take the head of the DAG, and parallelize all
>> downstream operators. It looks like at the moment, there is no interface
>> within DAG to provide access to its operators or streams. Does such an
>> interface exist or is this something I would need to expose? Was there a
>> design decision to not expose these?
>>
>>
>> On 9/23/15, 11:26 AM, "Munagala Ramanath" <ra...@datatorrent.com> wrote:
>>
>> >l
>>
>> ________________________________________________________
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The
>>information
>> transmitted herewith is intended only for use by the individual or
>>entity
>> to which it is addressed. If the reader of this message is not the
>>intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>
>> ________________________________________________________
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The
>>information
>> transmitted herewith is intended only for use by the individual or
>>entity
>> to which it is addressed. If the reader of this message is not the
>>intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>
>>

________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

Re: Support easy parallelization of pipelines

Posted by Timothy Farkas <ti...@datatorrent.com>.
Hi Ilya,

There are two ways to do this. You can do normal N x N partitioning, but
write a partitioner for B which assigns the same partition keys to M
operators in a partitioning. Then M partitions will receive the same data:
An example of setting partition keys in a partitioner is here:

https://github.com/apache/incubator-apex-core/blob/devel-3/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java

An easier way is this:

dag.addStream(“NAME”, A.output, B.input, C.input);

You have to put all the input ports you want to receive the same data in
the same stream declaration.

Tim

On Wed, Sep 23, 2015 at 7:39 AM, Ganelin, Ilya <Il...@capitalone.com>
wrote:

> Ram, thank you. I think this is a good starting point, however it requires
> having access to the stream at creation time (as well as the operator
> being added). I¹d ideally like to create a function:
>
> static void parallelize(DAG dag);
>
> This function would take the head of the DAG, and parallelize all
> downstream operators. It looks like at the moment, there is no interface
> within DAG to provide access to its operators or streams. Does such an
> interface exist or is this something I would need to expose? Was there a
> design decision to not expose these?
>
>
> On 9/23/15, 11:26 AM, "Munagala Ramanath" <ra...@datatorrent.com> wrote:
>
> >l
>
> ________________________________________________________
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
> ________________________________________________________
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>

Re: Support easy parallelization of pipelines

Posted by "Ganelin, Ilya" <Il...@capitalone.com>.
Ram, thank you. I think this is a good starting point, however it requires
having access to the stream at creation time (as well as the operator
being added). I¹d ideally like to create a function:

static void parallelize(DAG dag);

This function would take the head of the DAG, and parallelize all
downstream operators. It looks like at the moment, there is no interface
within DAG to provide access to its operators or streams. Does such an
interface exist or is this something I would need to expose? Was there a
design decision to not expose these?


On 9/23/15, 11:26 AM, "Munagala Ramanath" <ra...@datatorrent.com> wrote:

>l

________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.


Re: Support easy parallelization of pipelines

Posted by Munagala Ramanath <ra...@datatorrent.com>.
The returned StreamMeta object has an addSink() method for additng
additional sinks.

Ram

On Wed, Sep 23, 2015 at 8:06 AM, Ganelin, Ilya <Il...@capitalone.com>
wrote:

> Hi all. Say that I have a DAG (A -> B -> C) and, given any DAG, I want to
> create a new DAG that is (A -> B -> C,  A -> B’ -> C’)
>
> At the moment, Apex does not support the following operation:
>
> dag.addStream(“NAME”, A.output, B.input)
> dag.addStream(“NAME2”, A.output, B’.input)
>
> It throws the following error:
> java.lang.IllegalArgumentException: Operator A already connected to NAME
>
> One has to do :
> dag.addStream(“name”, A.output,  B.input, B’.input).
>
> Is there a concrete reason that the first approach is not supported? Is
> there any way to enable such an approach to allow iterative building of
> graphs? This would allow me to create a utility that automatically
> generates independent parallel pipelines given the same point of origin
> which greatly facilitates low-latency fault handling.
>
> Is there a reason an operator may not be connected to multiple operators
> after initialization?
>
> Appreciate any input, thanks!
> ________________________________________________________
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>

Re: Support easy parallelization of pipelines

Posted by "Ganelin, Ilya" <Il...@capitalone.com>.
Hi Tim - I understand this approach. However, in this case, different data
flows through each partition of operator B which is not my intent.
My intent is to have two independent pipelines operating on the exact same
set of data. 

Each pipeline may itself be partitioned (for scalability).

The reason for this is that if an operation fails in one pipeline, the
piece of data that was being processed is still being processed by the
other pipeline and there is no delay in processing. Otherwise, the data is
not lost, but we must wait for the STRAM to restart the failed operator
and resume processing from the upstream BufferServer.

Please let me know if that makes sense.

On 9/23/15, 11:31 AM, "Timothy Farkas" <ti...@datatorrent.com> wrote:

>Hi Ilya,
>
>If you want multiple parallel pipelines with the same operators and logic,
>you can enable parallel partitioning in your properties file.
>
>
>        <property>
>            <name>dt.operator.B.port.input.attr.PARTITION_PARALLEL</name>
>            <value>true</value>
>        </property>
>
>This will enable parallel partitioning for operator B. What this means is
>that if there are 8 partitions of A there will be 8 partitions of B, each
>partition of B is connected to exactly 1 upstream A operator (so this is
>not N x N partitioning). Furthermore if A gets dynamically repartitioned
>to
>4 operators, B will get repartitioned to 4 operators as well and all its
>downstream operators will get repartitioned appropriately.
>
>Thanks,
>Tim
>
>On Wed, Sep 23, 2015 at 7:06 AM, Ganelin, Ilya
><Il...@capitalone.com>
>wrote:
>
>> Hi all. Say that I have a DAG (A -> B -> C) and, given any DAG, I want
>>to
>> create a new DAG that is (A -> B -> C,  A -> B¹ -> C¹)
>>
>> At the moment, Apex does not support the following operation:
>>
>> dag.addStream(³NAME², A.output, B.input)
>> dag.addStream(³NAME2², A.output, B¹.input)
>>
>> It throws the following error:
>> java.lang.IllegalArgumentException: Operator A already connected to NAME
>>
>> One has to do :
>> dag.addStream(³name², A.output,  B.input, B¹.input).
>>
>> Is there a concrete reason that the first approach is not supported? Is
>> there any way to enable such an approach to allow iterative building of
>> graphs? This would allow me to create a utility that automatically
>> generates independent parallel pipelines given the same point of origin
>> which greatly facilitates low-latency fault handling.
>>
>> Is there a reason an operator may not be connected to multiple operators
>> after initialization?
>>
>> Appreciate any input, thanks!
>> ________________________________________________________
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The
>>information
>> transmitted herewith is intended only for use by the individual or
>>entity
>> to which it is addressed. If the reader of this message is not the
>>intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>

________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.


Re: Support easy parallelization of pipelines

Posted by Timothy Farkas <ti...@datatorrent.com>.
Hi Ilya,

If you want multiple parallel pipelines with the same operators and logic,
you can enable parallel partitioning in your properties file.


        <property>
            <name>dt.operator.B.port.input.attr.PARTITION_PARALLEL</name>
            <value>true</value>
        </property>

This will enable parallel partitioning for operator B. What this means is
that if there are 8 partitions of A there will be 8 partitions of B, each
partition of B is connected to exactly 1 upstream A operator (so this is
not N x N partitioning). Furthermore if A gets dynamically repartitioned to
4 operators, B will get repartitioned to 4 operators as well and all its
downstream operators will get repartitioned appropriately.

Thanks,
Tim

On Wed, Sep 23, 2015 at 7:06 AM, Ganelin, Ilya <Il...@capitalone.com>
wrote:

> Hi all. Say that I have a DAG (A -> B -> C) and, given any DAG, I want to
> create a new DAG that is (A -> B -> C,  A -> B’ -> C’)
>
> At the moment, Apex does not support the following operation:
>
> dag.addStream(“NAME”, A.output, B.input)
> dag.addStream(“NAME2”, A.output, B’.input)
>
> It throws the following error:
> java.lang.IllegalArgumentException: Operator A already connected to NAME
>
> One has to do :
> dag.addStream(“name”, A.output,  B.input, B’.input).
>
> Is there a concrete reason that the first approach is not supported? Is
> there any way to enable such an approach to allow iterative building of
> graphs? This would allow me to create a utility that automatically
> generates independent parallel pipelines given the same point of origin
> which greatly facilitates low-latency fault handling.
>
> Is there a reason an operator may not be connected to multiple operators
> after initialization?
>
> Appreciate any input, thanks!
> ________________________________________________________
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>