You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by Ananth Gundabattula <ag...@gmail.com> on 2016/04/19 21:20:27 UTC

A few questions about operator

Hello All,

I was wondering if you could help me with the following questions as I was
not able to locate the info from the docs:


   - When is the constructor of an operator called ? The docs say once in
   the lifetime of an operator but I was wondering whether the definition of
   "lifetime" spans across start/stop/crash ( because of a coding error ) of
   an apex application ?
   - Regarding backpressure and the buffer server, how does the buffer
   server survive application crashes ? I mean considering a situation when
   the bufferserver itself dies for whatever reason, is it guaranteed a
   downstream operator will eventually catchup with an Upstream operator when
   the buffer server is brought back up?
   - Is there an equivalent of the OffsetManager for the 0.9 version of the
   Kafka operator ?
   - Am I correct in assuming that the moment we rename an application, the
   semantics of the Kafka operator will completely change and might end up
   reading from the "initialOffset" by the application code ? How is the
   semantics maintained for a definition of "application name" ? Does every
   deploy of the application code result in a new application or it is simply
   using the @ApplicationAnnotation(name="") instance to define this meaning ?


Thanks for your time.


Regards,

Ananth

Re: A few questions about operator

Posted by Sandesh Hegde <sa...@datatorrent.com>.
Hello Ananth,

Here are my answers, except for the Kafka 0.9 operator

1. "Lifetime" is essentially the same as you have in any Java code. It
doesn't span across start/stop/crash of an Apex application.

2. Yes, downstream will catch up.

4. Every launch is a new launch unless the application is restarted using
the following command line option with dtcli ( being renamed to apex-cli )

dtcli launch <jar/apa> " -originalAppId <application id>",

Application will start from the previous checkpoint.

Thanks


On Tue, Apr 19, 2016 at 12:20 PM Ananth Gundabattula <
agundabattula@gmail.com> wrote:

> Hello All,
>
> I was wondering if you could help me with the following questions as I was
> not able to locate the info from the docs:
>
>
>    - When is the constructor of an operator called ? The docs say once in
>    the lifetime of an operator but I was wondering whether the definition of
>    "lifetime" spans across start/stop/crash ( because of a coding error ) of
>    an apex application ?
>    - Regarding backpressure and the buffer server, how does the buffer
>    server survive application crashes ? I mean considering a situation when
>    the bufferserver itself dies for whatever reason, is it guaranteed a
>    downstream operator will eventually catchup with an Upstream operator when
>    the buffer server is brought back up?
>    - Is there an equivalent of the OffsetManager for the 0.9 version of
>    the Kafka operator ?
>    - Am I correct in assuming that the moment we rename an application,
>    the semantics of the Kafka operator will completely change and might end up
>    reading from the "initialOffset" by the application code ? How is the
>    semantics maintained for a definition of "application name" ? Does every
>    deploy of the application code result in a new application or it is simply
>    using the @ApplicationAnnotation(name="") instance to define this meaning ?
>
>
> Thanks for your time.
>
>
> Regards,
>
> Ananth
>

Re: A few questions about operator

Posted by Ananth Gundabattula <ag...@gmail.com>.
Thanks all for your time and responses.

On Wed, Apr 20, 2016 at 9:08 AM, hsy541@gmail.com <hs...@gmail.com> wrote:

> Hey Ananth,
>
> There is no OffsetManager in 0.9 kafka operator because the operator
> commit the offsets(all data that has been processed by the entire
> application) in kafka implicitly. The offset is supposed to be stored with
> consumer id, and here the consumer id is the application name.  So if you
> don't want to start the whole application from last checkpoint but also you
> want the kafka input operator to continue consuming from where the
> application is left off, you can start a new application with same name.
>
> Regards,
> Siyuan
>
> On Tue, Apr 19, 2016 at 3:08 PM, Ashwin Chandra Putta <
> ashwinchandrap@gmail.com> wrote:
>
>> Hey Ananth,
>>
>> Please find the answers inline.
>>
>> Regards,
>> Ashwin.
>>
>> On Tue, Apr 19, 2016 at 12:20 PM, Ananth Gundabattula <
>> agundabattula@gmail.com> wrote:
>>
>>> Hello All,
>>>
>>> I was wondering if you could help me with the following questions as I
>>> was not able to locate the info from the docs:
>>>
>>>
>>>    - When is the constructor of an operator called ? The docs say once
>>>    in the lifetime of an operator but I was wondering whether the definition
>>>    of "lifetime" spans across start/stop/crash ( because of a coding error )
>>>    of an apex application ?
>>>
>>> [Ashwin] The operator has a life cycle. Constructor  --> checkpointed
>> state applied --> setup() --> loop {beginWindow() --> loop {process() for
>> each input port} --> endWindow()} --> tearDown(). When an operator
>> recovers, the cycle is the same. The checkpointed state is the state from
>> last known checkpoint before the operator crashed.
>>
>>>
>>>    - Regarding backpressure and the buffer server, how does the buffer
>>>    server survive application crashes ? I mean considering a situation when
>>>    the bufferserver itself dies for whatever reason, is it guaranteed a
>>>    downstream operator will eventually catchup with an Upstream operator when
>>>    the buffer server is brought back up?
>>>
>>> [Ashwin] The buffer server always lives with the upstream operator. In
>> fact, it lives within the same JVM as upstream operator. If an operator
>> fails, the upstream operator's buffer server will have the required data
>> state. If the upstream operator fails, its upstream operator's buffer
>> server has the data state and so on.
>>
>>>
>>>    - Is there an equivalent of the OffsetManager for the 0.9 version of
>>>    the Kafka operator ?
>>>    - Am I correct in assuming that the moment we rename an application,
>>>    the semantics of the Kafka operator will completely change and might end up
>>>    reading from the "initialOffset" by the application code ? How is the
>>>    semantics maintained for a definition of "application name" ? Does every
>>>    deploy of the application code result in a new application or it is simply
>>>    using the @ApplicationAnnotation(name="") instance to define this meaning ?
>>>
>>> [Ashwin] You can launch an application from its previous state by using
>> the -originalAppId parameter and provide the yarn application id from
>> its previous run's checkpointed state, it should apply to all operator
>> within the dag including the kafka input operator. You can also provide a
>> new name for the application using attribute dt.attr.APPLICATION_NAME.
>> eg: launch pi-demo-3.4.0-incubating-SNAPSHOT.apa -originalAppId
>> application_1459879799578_8727 -Ddt.attr.APPLICATION_NAME="pidemo v201"
>>
>>>
>>> Thanks for your time.
>>>
>>>
>>> Regards,
>>>
>>> Ananth
>>>
>>
>>
>>
>> --
>>
>> Regards,
>> Ashwin.
>>
>
>

Re: A few questions about operator

Posted by "hsy541@gmail.com" <hs...@gmail.com>.
Hey Ananth,

There is no OffsetManager in 0.9 kafka operator because the operator commit
the offsets(all data that has been processed by the entire application) in
kafka implicitly. The offset is supposed to be stored with consumer id, and
here the consumer id is the application name.  So if you don't want to
start the whole application from last checkpoint but also you want the
kafka input operator to continue consuming from where the application is
left off, you can start a new application with same name.

Regards,
Siyuan

On Tue, Apr 19, 2016 at 3:08 PM, Ashwin Chandra Putta <
ashwinchandrap@gmail.com> wrote:

> Hey Ananth,
>
> Please find the answers inline.
>
> Regards,
> Ashwin.
>
> On Tue, Apr 19, 2016 at 12:20 PM, Ananth Gundabattula <
> agundabattula@gmail.com> wrote:
>
>> Hello All,
>>
>> I was wondering if you could help me with the following questions as I
>> was not able to locate the info from the docs:
>>
>>
>>    - When is the constructor of an operator called ? The docs say once
>>    in the lifetime of an operator but I was wondering whether the definition
>>    of "lifetime" spans across start/stop/crash ( because of a coding error )
>>    of an apex application ?
>>
>> [Ashwin] The operator has a life cycle. Constructor  --> checkpointed
> state applied --> setup() --> loop {beginWindow() --> loop {process() for
> each input port} --> endWindow()} --> tearDown(). When an operator
> recovers, the cycle is the same. The checkpointed state is the state from
> last known checkpoint before the operator crashed.
>
>>
>>    - Regarding backpressure and the buffer server, how does the buffer
>>    server survive application crashes ? I mean considering a situation when
>>    the bufferserver itself dies for whatever reason, is it guaranteed a
>>    downstream operator will eventually catchup with an Upstream operator when
>>    the buffer server is brought back up?
>>
>> [Ashwin] The buffer server always lives with the upstream operator. In
> fact, it lives within the same JVM as upstream operator. If an operator
> fails, the upstream operator's buffer server will have the required data
> state. If the upstream operator fails, its upstream operator's buffer
> server has the data state and so on.
>
>>
>>    - Is there an equivalent of the OffsetManager for the 0.9 version of
>>    the Kafka operator ?
>>    - Am I correct in assuming that the moment we rename an application,
>>    the semantics of the Kafka operator will completely change and might end up
>>    reading from the "initialOffset" by the application code ? How is the
>>    semantics maintained for a definition of "application name" ? Does every
>>    deploy of the application code result in a new application or it is simply
>>    using the @ApplicationAnnotation(name="") instance to define this meaning ?
>>
>> [Ashwin] You can launch an application from its previous state by using
> the -originalAppId parameter and provide the yarn application id from its
> previous run's checkpointed state, it should apply to all operator within
> the dag including the kafka input operator. You can also provide a new name
> for the application using attribute dt.attr.APPLICATION_NAME. eg: launch
> pi-demo-3.4.0-incubating-SNAPSHOT.apa -originalAppId
> application_1459879799578_8727 -Ddt.attr.APPLICATION_NAME="pidemo v201"
>
>>
>> Thanks for your time.
>>
>>
>> Regards,
>>
>> Ananth
>>
>
>
>
> --
>
> Regards,
> Ashwin.
>

Re: A few questions about operator

Posted by Ashwin Chandra Putta <as...@gmail.com>.
Hey Ananth,

Please find the answers inline.

Regards,
Ashwin.

On Tue, Apr 19, 2016 at 12:20 PM, Ananth Gundabattula <
agundabattula@gmail.com> wrote:

> Hello All,
>
> I was wondering if you could help me with the following questions as I was
> not able to locate the info from the docs:
>
>
>    - When is the constructor of an operator called ? The docs say once in
>    the lifetime of an operator but I was wondering whether the definition of
>    "lifetime" spans across start/stop/crash ( because of a coding error ) of
>    an apex application ?
>
> [Ashwin] The operator has a life cycle. Constructor  --> checkpointed
state applied --> setup() --> loop {beginWindow() --> loop {process() for
each input port} --> endWindow()} --> tearDown(). When an operator
recovers, the cycle is the same. The checkpointed state is the state from
last known checkpoint before the operator crashed.

>
>    - Regarding backpressure and the buffer server, how does the buffer
>    server survive application crashes ? I mean considering a situation when
>    the bufferserver itself dies for whatever reason, is it guaranteed a
>    downstream operator will eventually catchup with an Upstream operator when
>    the buffer server is brought back up?
>
> [Ashwin] The buffer server always lives with the upstream operator. In
fact, it lives within the same JVM as upstream operator. If an operator
fails, the upstream operator's buffer server will have the required data
state. If the upstream operator fails, its upstream operator's buffer
server has the data state and so on.

>
>    - Is there an equivalent of the OffsetManager for the 0.9 version of
>    the Kafka operator ?
>    - Am I correct in assuming that the moment we rename an application,
>    the semantics of the Kafka operator will completely change and might end up
>    reading from the "initialOffset" by the application code ? How is the
>    semantics maintained for a definition of "application name" ? Does every
>    deploy of the application code result in a new application or it is simply
>    using the @ApplicationAnnotation(name="") instance to define this meaning ?
>
> [Ashwin] You can launch an application from its previous state by using
the -originalAppId parameter and provide the yarn application id from its
previous run's checkpointed state, it should apply to all operator within
the dag including the kafka input operator. You can also provide a new name
for the application using attribute dt.attr.APPLICATION_NAME. eg: launch
pi-demo-3.4.0-incubating-SNAPSHOT.apa -originalAppId
application_1459879799578_8727 -Ddt.attr.APPLICATION_NAME="pidemo v201"

>
> Thanks for your time.
>
>
> Regards,
>
> Ananth
>



-- 

Regards,
Ashwin.