You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by Sunil Parmar <sp...@threatmetrix.com> on 2017/02/21 18:05:30 UTC

Occasional Out of order tuples when emitting from a thread

Hi there,
We have the following setup:

  *   we have a generic operator that's processing tuples in its input port
  *   in the input port's process method, we check for a condition, and:
     *   if the condition is met, the tuple is emitted to the next operator right away (in the process method)
     *   Otherwise, if the condition is not met, we store the tuple  in some cache and we use some threads that periodically check the condition to become true. Once the condition is true, the threads call the emit method on the stored tuples.

With this setup, we occasionally encounter the following error:
2017-02-15 17:29:09,364 ERROR com.datatorrent.stram.engine.GenericNode: Catastrophic Error: Out of sequence BEGIN_WINDOW tuple 58a4046100003b7f on port transformedJSON while expecting 58a4046100003b7e

Is there a way to make the above work correctly?
If not, can you recommend a better way of doing this?
How can we ensure window assignment is done synchronously before emitting tuples ?

Thanks very much in advance...
-allan

Re: Occasional Out of order tuples when emitting from a thread

Posted by Munagala Ramanath <ra...@datatorrent.com>.
Please note that tuples should not be emitted by any thread other than the
main operator thread.

A common pattern is to use a thread-safe queue and have worker threads
enqueue
tuples there; the main operator thread then pulls tuples from the queue and
emits them.

Ram

On Tue, Feb 21, 2017 at 10:05 AM, Sunil Parmar <sp...@threatmetrix.com>
wrote:

> Hi there,
> We have the following setup:
>
>    - we have a generic operator that’s processing tuples in its input port
>    - in the input port’s process method, we check for a condition, and:
>       - if the condition is met, the tuple is emitted to the next
>       operator right away (in the process method)
>       - Otherwise, if the condition is not met, we store the tuple  in
>       some cache and we use some threads that periodically check the condition to
>       become true. Once the condition is true, the threads call the emit method
>       on the stored tuples.
>
> With this setup, we occasionally encounter the following error:
> 2017-02-15 17:29:09,364 ERROR com.datatorrent.stram.engine.GenericNode:
> Catastrophic Error: Out of sequence BEGIN_WINDOW tuple 58a4046100003b7f on
> port transformedJSON while expecting 58a4046100003b7e
>
> Is there a way to make the above work correctly?
> If not, can you recommend a better way of doing this?
> How can we ensure window assignment is done synchronously before emitting
> tuples ?
>
> Thanks very much in advance…
> -allan
>



-- 

_______________________________________________________

Munagala V. Ramanath

Software Engineer

E: ram@datatorrent.com | M: (408) 331-5034 | Twitter: @UnknownRam

www.datatorrent.com  |  apex.apache.org

Re: Occasional Out of order tuples when emitting from a thread

Posted by Ashwin Chandra Putta <as...@gmail.com>.
Sunil,

You can poll the queue in end window since process method in the input port
does not get called if there is no incoming tuple. However, end window is
called irrespective of there are incoming tuples or not.

Regards,
Ashwin.

On Tue, Feb 21, 2017 at 11:32 AM, Sunil Parmar <sp...@threatmetrix.com>
wrote:

> Ram,
> Thanks for the prompt response. If we use the approach you suggested we’re
> dependent on main thread’s process call I.e. Tuples in the thread safe
> queue gets only processed when main thread is processing incoming tuples.
> How can we explicitly call the process from polling of delay queue ?
>
> Just for reference here’s the sample code snippet for our operator.
>
> public class MyOperator extends BaseOperator implements
>
>         Operator.ActivationListener<Context.OperatorContext> {
> …..
>
> @InputPortFieldAnnotation
>
>     public transient DefaultInputPort<String> kafkaStreamInput =
>
>             new DefaultInputPort<String>() {
>
>         List<String> errors = new ArrayList<String>();
>
>         @Override
>
>         public void process(String consumerRecord) {
>
> //Code for normal tuple process
>
> //Code to poll thread safe queue
>
> }
>
> *————————————**—————————————————————————*
> *From: *Munagala Ramanath <ra...@datatorrent.com>
> *To: *users@apex.apache.org
> *CC: *"dev@apex.apache.org" <de...@apex.apache.org>, Allan De Leon <
> adeleon@threatmetrix.com>, Tim Zhu <tz...@threatmetrix.com>
> *Subject: *Re: Occasional Out of order tuples when emitting from a thread
> *Date: *2017-02-21 10:08 (-0800)
> *List: *users@apex.apache.org
> <ht...@apex.apache.org>
>
> Please note that tuples should not be emitted by any thread other than the
> main operator thread.
>
> A common pattern is to use a thread-safe queue and have worker threads
> enqueue
> tuples there; the main operator thread then pulls tuples from the queue and
> emits them.
>
> Ram
>
> _______________________________________________________
>
> Munagala V. Ramanath
>
> Software Engineer
>
> E: ram@datatorrent.com | M: (408) 331-5034 | Twitter: @UnknownRam
> www.datatorrent.com  |  apex.apache.org
>
>
> From: Sunil Parmar <sp...@threatmetrix.com>
> Date: Tuesday, February 21, 2017 at 10:05 AM
> To: "users@apex.apache.org" <us...@apex.apache.org>, "dev@apex.apache.org"
> <de...@apex.apache.org>
> Cc: Allan De Leon <ad...@threatmetrix.com>, Tim Zhu <
> tzhu@threatmetrix.com>
> Subject: Occasional Out of order tuples when emitting from a thread
>
> Hi there,
> We have the following setup:
>
>    - we have a generic operator that’s processing tuples in its input port
>    - in the input port’s process method, we check for a condition, and:
>       - if the condition is met, the tuple is emitted to the next
>       operator right away (in the process method)
>       - Otherwise, if the condition is not met, we store the tuple  in
>       some cache and we use some threads that periodically check the condition to
>       become true. Once the condition is true, the threads call the emit method
>       on the stored tuples.
>
> With this setup, we occasionally encounter the following error:
> 2017-02-15 17:29:09,364 ERROR com.datatorrent.stram.engine.GenericNode:
> Catastrophic Error: Out of sequence BEGIN_WINDOW tuple 58a4046100003b7f on
> port transformedJSON while expecting 58a4046100003b7e
>
> Is there a way to make the above work correctly?
> If not, can you recommend a better way of doing this?
> How can we ensure window assignment is done synchronously before emitting
> tuples ?
>
> Thanks very much in advance…
> -allan
>



-- 

Regards,
Ashwin.

Re: Occasional Out of order tuples when emitting from a thread

Posted by Vlad Rozov <v....@datatorrent.com>.
Please see Threads section at 
http://apex.apache.org/docs/apex/development_best_practices/

Note that an operator has only one thread created by the Apex that it 
can use to interact with the system. Starting with 3.5.0 release, this 
is enforced by the platform and it will throw an exception if an 
operator tries to emit on a different thread.

Thank you,

Vlad

/Join us at Apex Big Data World-San Jose 
<http://www.apexbigdata.com/san-jose.html>, April 4, 2017/
http://www.apexbigdata.com/san-jose-register.html 
<http://www.apexbigdata.com/san-jose-register.html>
On 2/21/17 11:54, Munagala Ramanath wrote:
> To amplify Sandesh's answer a bit, the main operator thread invokes 
> user callbacks like beginWindow(), endWindow(), process() method of 
> input ports, and emitTuples() in input operators.
>
> Additionally, if the operator implements the IdleTimeHandler, and if 
> the operator is deemed to be idle, the handleIdleTime() callback will 
> be called. All tuple output must be done in one of these callbacks.
>
> So you can check the thread-safe queue in any of these callbacks and 
> emit output tuples as needed.
>
> Ram
>
> On Tue, Feb 21, 2017 at 11:42 AM, Sandesh Hegde 
> <sandesh@datatorrent.com <ma...@datatorrent.com>> wrote:
>
>     -Removed dev@
>
>     Operators can implement idle Time Handler.
>     https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Operator.IdleTimeHandler.html
>     <https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Operator.IdleTimeHandler.html>
>
>     On Tue, Feb 21, 2017 at 11:33 AM Sunil Parmar
>     <sparmar@threatmetrix.com <ma...@threatmetrix.com>> wrote:
>
>         Ram,
>         Thanks for the prompt response. If we use the approach you
>         suggested we\u2019re dependent on main thread\u2019s process call I.e.
>         Tuples in the thread safe queue gets only processed when main
>         thread is processing incoming tuples. How can we explicitly
>         call the process from polling of delay queue ?
>
>         Just for reference here\u2019s the sample code snippet for our
>         operator.
>
>         public class MyOperator extends BaseOperator implements
>
>                 Operator.ActivationListener<Context.OperatorContext> {
>
>         \u2026..
>
>         @InputPortFieldAnnotation
>
>         public transient DefaultInputPort<String> kafkaStreamInput =
>
>         new DefaultInputPort<String>() {
>
>                 List<String> errors = new ArrayList<String>();
>
>         @Override
>
>         public void process(String consumerRecord) {
>
>         //Code for normal tuple process
>
>         //Code to poll thread safe queue
>
>         }
>
>
>         *\u2014\u2014\u2014\u2014\u2014\u2014\u2014\u2014\u2014\u2014\u2014\u2014**\u2014\u2014\u2014\u2014\u2014\u2014\u2014\u2014\u2014\u2014\u2014\u2014\u2014\u2014\u2014\u2014\u2014\u2014\u2014\u2014\u2014\u2014\u2014\u2014\u2014*
>         *From: *Munagala Ramanath <ram@datatorrent.com
>         <ma...@datatorrent.com>>
>         *To: *users@apex.apache.org <ma...@apex.apache.org>
>         *CC: *"dev@apex.apache.org <ma...@apex.apache.org>"
>         <dev@apex.apache.org <ma...@apex.apache.org>>, Allan De
>         Leon <adeleon@threatmetrix.com
>         <ma...@threatmetrix.com>>, Tim Zhu
>         <tzhu@threatmetrix.com <ma...@threatmetrix.com>>
>         *Subject: *Re: Occasional Out of order tuples when emitting
>         from a thread
>         *Date: *2017-02-21 10:08 (-0800)
>         *List: *users@apex.apache.org
>         <ht...@apex.apache.org>
>
>         Please note that tuples should not be emitted by any thread other than the
>         main operator thread.
>
>         A common pattern is to use a thread-safe queue and have worker threads
>         enqueue
>         tuples there; the main operator thread then pulls tuples from the queue and
>         emits them.
>
>         Ram
>
>         _______________________________________________________
>
>         Munagala V. Ramanath
>
>         Software Engineer
>
>         E:ram@datatorrent.com <ma...@datatorrent.com>  | M:(408) 331-5034 <tel:%28408%29%20331-5034>  | Twitter: @UnknownRam
>
>         www.datatorrent.com <http://www.datatorrent.com>   |apex.apache.org <http://apex.apache.org>  
>
>         From: Sunil Parmar <sparmar@threatmetrix.com
>         <ma...@threatmetrix.com>> Date: Tuesday, February 21,
>         2017 at 10:05 AM To: "users@apex.apache.org
>         <ma...@apex.apache.org>" <users@apex.apache.org
>         <ma...@apex.apache.org>>, "dev@apex.apache.org
>         <ma...@apex.apache.org>" <dev@apex.apache.org
>         <ma...@apex.apache.org>> Cc: Allan De Leon
>         <adeleon@threatmetrix.com <ma...@threatmetrix.com>>,
>         Tim Zhu <tzhu@threatmetrix.com <ma...@threatmetrix.com>>
>         Subject: Occasional Out of order tuples when emitting from a
>         thread
>         Hi there,
>         We have the following setup:
>
>           * we have a generic operator that\u2019s processing tuples in its
>             input port
>           * in the input port\u2019s process method, we check for a
>             condition, and:
>               o if the condition is met, the tuple is emitted to the
>                 next operator right away (in the process method)
>               o Otherwise, if the condition is not met, we store the
>                 tuple  in some cache and we use some threads that
>                 periodically check the condition to become true. Once
>                 the condition is true, the threads call the emit
>                 method on the stored tuples.
>
>         With this setup, we occasionally encounter the following error:
>         2017-02-15 17:29:09,364 ERROR
>         com.datatorrent.stram.engine.GenericNode: Catastrophic Error:
>         Out of sequence BEGIN_WINDOW tuple 58a4046100003b7f on port
>         transformedJSON while expecting 58a4046100003b7e
>         Is there a way to make the above work correctly?
>         If not, can you recommend a better way of doing this?
>         How can we ensure window assignment is done synchronously
>         before emitting tuples ?
>         Thanks very much in advance\u2026
>         -allan
>
>     -- 
>     /Join us at ApexBigDataWorld-San Jose
>     <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!/
>     http://www.apexbigdata.com/san-jose-register.html
>     <http://www.apexbigdata.com/san-jose-register.html>
>
> -- 
>
> _______________________________________________________
>
> Munagala V. Ramanath
>
> Software Engineer
>
> E: ram@datatorrent.com <ma...@datatorrent.com> | M: (408) 
> 331-5034 | Twitter: @UnknownRam
>
> www.datatorrent.com <http://www.datatorrent.com>  | apex.apache.org 
> <http://apex.apache.org>
>

Re: Occasional Out of order tuples when emitting from a thread

Posted by Munagala Ramanath <ra...@datatorrent.com>.
To amplify Sandesh's answer a bit, the main operator thread invokes user
callbacks like beginWindow(), endWindow(), process() method of input ports,
and emitTuples() in input operators.

Additionally, if the operator implements the IdleTimeHandler, and if the
operator is deemed to be idle, the handleIdleTime() callback will be
called. All tuple output must be done in one of these callbacks.

So you can check the thread-safe queue in any of these callbacks and emit
output tuples as needed.

Ram

On Tue, Feb 21, 2017 at 11:42 AM, Sandesh Hegde <sa...@datatorrent.com>
wrote:

> -Removed dev@
>
> Operators can implement idle Time Handler.
> https://www.datatorrent.com/docs/apidocs/com/datatorrent/
> api/Operator.IdleTimeHandler.html
>
> On Tue, Feb 21, 2017 at 11:33 AM Sunil Parmar <sp...@threatmetrix.com>
> wrote:
>
>> Ram,
>> Thanks for the prompt response. If we use the approach you suggested
>> we’re dependent on main thread’s process call I.e. Tuples in the thread
>> safe queue gets only processed when main thread is processing incoming
>> tuples. How can we explicitly call the process from polling of delay queue ?
>>
>> Just for reference here’s the sample code snippet for our operator.
>>
>> public class MyOperator extends BaseOperator implements
>>
>>         Operator.ActivationListener<Context.OperatorContext> {
>> …..
>>
>> @InputPortFieldAnnotation
>>
>>     public transient DefaultInputPort<String> kafkaStreamInput =
>>
>>             new DefaultInputPort<String>() {
>>
>>         List<String> errors = new ArrayList<String>();
>>
>>         @Override
>>
>>         public void process(String consumerRecord) {
>>
>> //Code for normal tuple process
>>
>> //Code to poll thread safe queue
>>
>> }
>>
>> *————————————**—————————————————————————*
>> *From: *Munagala Ramanath <ra...@datatorrent.com>
>> *To: *users@apex.apache.org
>> *CC: *"dev@apex.apache.org" <de...@apex.apache.org>, Allan De Leon <
>> adeleon@threatmetrix.com>, Tim Zhu <tz...@threatmetrix.com>
>> *Subject: *Re: Occasional Out of order tuples when emitting from a thread
>> *Date: *2017-02-21 10:08 (-0800)
>> *List: *users@apex.apache.org
>> <ht...@apex.apache.org>
>>
>> Please note that tuples should not be emitted by any thread other than the
>> main operator thread.
>>
>> A common pattern is to use a thread-safe queue and have worker threads
>> enqueue
>> tuples there; the main operator thread then pulls tuples from the queue and
>> emits them.
>>
>> Ram
>>
>> _______________________________________________________
>>
>> Munagala V. Ramanath
>>
>> Software Engineer
>>
>> E: ram@datatorrent.com | M: (408) 331-5034 | Twitter: @UnknownRam
>> www.datatorrent.com  |  apex.apache.org
>>
>>
>> From: Sunil Parmar <sp...@threatmetrix.com>
>> Date: Tuesday, February 21, 2017 at 10:05 AM
>> To: "users@apex.apache.org" <us...@apex.apache.org>, "dev@apex.apache.org"
>> <de...@apex.apache.org>
>> Cc: Allan De Leon <ad...@threatmetrix.com>, Tim Zhu <
>> tzhu@threatmetrix.com>
>> Subject: Occasional Out of order tuples when emitting from a thread
>>
>> Hi there,
>> We have the following setup:
>>
>>    - we have a generic operator that’s processing tuples in its input
>>    port
>>    - in the input port’s process method, we check for a condition, and:
>>       - if the condition is met, the tuple is emitted to the next
>>       operator right away (in the process method)
>>       - Otherwise, if the condition is not met, we store the tuple  in
>>       some cache and we use some threads that periodically check the condition to
>>       become true. Once the condition is true, the threads call the emit method
>>       on the stored tuples.
>>
>> With this setup, we occasionally encounter the following error:
>> 2017-02-15 17:29:09,364 ERROR com.datatorrent.stram.engine.GenericNode:
>> Catastrophic Error: Out of sequence BEGIN_WINDOW tuple 58a4046100003b7f on
>> port transformedJSON while expecting 58a4046100003b7e
>>
>> Is there a way to make the above work correctly?
>> If not, can you recommend a better way of doing this?
>> How can we ensure window assignment is done synchronously before emitting
>> tuples ?
>>
>> Thanks very much in advance…
>> -allan
>>
> --
> *Join us at Apex Big Data World-San Jose
> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> [image: http://www.apexbigdata.com/san-jose-register.html]
> <http://www.apexbigdata.com/san-jose-register.html>
>



-- 

_______________________________________________________

Munagala V. Ramanath

Software Engineer

E: ram@datatorrent.com | M: (408) 331-5034 | Twitter: @UnknownRam

www.datatorrent.com  |  apex.apache.org

Re: Occasional Out of order tuples when emitting from a thread

Posted by Sandesh Hegde <sa...@datatorrent.com>.
-Removed dev@

Operators can implement idle Time Handler.
https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Operator.IdleTimeHandler.html

On Tue, Feb 21, 2017 at 11:33 AM Sunil Parmar <sp...@threatmetrix.com>
wrote:

> Ram,
> Thanks for the prompt response. If we use the approach you suggested we’re
> dependent on main thread’s process call I.e. Tuples in the thread safe
> queue gets only processed when main thread is processing incoming tuples.
> How can we explicitly call the process from polling of delay queue ?
>
> Just for reference here’s the sample code snippet for our operator.
>
> public class MyOperator extends BaseOperator implements
>
>         Operator.ActivationListener<Context.OperatorContext> {
> …..
>
> @InputPortFieldAnnotation
>
>     public transient DefaultInputPort<String> kafkaStreamInput =
>
>             new DefaultInputPort<String>() {
>
>         List<String> errors = new ArrayList<String>();
>
>         @Override
>
>         public void process(String consumerRecord) {
>
> //Code for normal tuple process
>
> //Code to poll thread safe queue
>
> }
>
> *————————————**—————————————————————————*
> *From: *Munagala Ramanath <ra...@datatorrent.com>
> *To: *users@apex.apache.org
> *CC: *"dev@apex.apache.org" <de...@apex.apache.org>, Allan De Leon <
> adeleon@threatmetrix.com>, Tim Zhu <tz...@threatmetrix.com>
> *Subject: *Re: Occasional Out of order tuples when emitting from a thread
> *Date: *2017-02-21 10:08 (-0800)
> *List: *users@apex.apache.org
> <ht...@apex.apache.org>
>
> Please note that tuples should not be emitted by any thread other than the
> main operator thread.
>
> A common pattern is to use a thread-safe queue and have worker threads
> enqueue
> tuples there; the main operator thread then pulls tuples from the queue and
> emits them.
>
> Ram
>
> _______________________________________________________
>
> Munagala V. Ramanath
>
> Software Engineer
>
> E: ram@datatorrent.com | M: (408) 331-5034 | Twitter: @UnknownRam
> www.datatorrent.com  |  apex.apache.org
>
>
> From: Sunil Parmar <sp...@threatmetrix.com>
> Date: Tuesday, February 21, 2017 at 10:05 AM
> To: "users@apex.apache.org" <us...@apex.apache.org>, "dev@apex.apache.org"
> <de...@apex.apache.org>
> Cc: Allan De Leon <ad...@threatmetrix.com>, Tim Zhu <
> tzhu@threatmetrix.com>
> Subject: Occasional Out of order tuples when emitting from a thread
>
> Hi there,
> We have the following setup:
>
>    - we have a generic operator that’s processing tuples in its input port
>    - in the input port’s process method, we check for a condition, and:
>       - if the condition is met, the tuple is emitted to the next
>       operator right away (in the process method)
>       - Otherwise, if the condition is not met, we store the tuple  in
>       some cache and we use some threads that periodically check the condition to
>       become true. Once the condition is true, the threads call the emit method
>       on the stored tuples.
>
> With this setup, we occasionally encounter the following error:
> 2017-02-15 17:29:09,364 ERROR com.datatorrent.stram.engine.GenericNode:
> Catastrophic Error: Out of sequence BEGIN_WINDOW tuple 58a4046100003b7f on
> port transformedJSON while expecting 58a4046100003b7e
>
> Is there a way to make the above work correctly?
> If not, can you recommend a better way of doing this?
> How can we ensure window assignment is done synchronously before emitting
> tuples ?
>
> Thanks very much in advance…
> -allan
>
-- 
*Join us at Apex Big Data World-San Jose
<http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
[image: http://www.apexbigdata.com/san-jose-register.html]
<http://www.apexbigdata.com/san-jose-register.html>

Re: Occasional Out of order tuples when emitting from a thread

Posted by Ashwin Chandra Putta <as...@gmail.com>.
Sunil,

You can poll the queue in end window since process method in the input port
does not get called if there is no incoming tuple. However, end window is
called irrespective of there are incoming tuples or not.

Regards,
Ashwin.

On Tue, Feb 21, 2017 at 11:32 AM, Sunil Parmar <sp...@threatmetrix.com>
wrote:

> Ram,
> Thanks for the prompt response. If we use the approach you suggested we’re
> dependent on main thread’s process call I.e. Tuples in the thread safe
> queue gets only processed when main thread is processing incoming tuples.
> How can we explicitly call the process from polling of delay queue ?
>
> Just for reference here’s the sample code snippet for our operator.
>
> public class MyOperator extends BaseOperator implements
>
>         Operator.ActivationListener<Context.OperatorContext> {
> …..
>
> @InputPortFieldAnnotation
>
>     public transient DefaultInputPort<String> kafkaStreamInput =
>
>             new DefaultInputPort<String>() {
>
>         List<String> errors = new ArrayList<String>();
>
>         @Override
>
>         public void process(String consumerRecord) {
>
> //Code for normal tuple process
>
> //Code to poll thread safe queue
>
> }
>
> *————————————**—————————————————————————*
> *From: *Munagala Ramanath <ra...@datatorrent.com>
> *To: *users@apex.apache.org
> *CC: *"dev@apex.apache.org" <de...@apex.apache.org>, Allan De Leon <
> adeleon@threatmetrix.com>, Tim Zhu <tz...@threatmetrix.com>
> *Subject: *Re: Occasional Out of order tuples when emitting from a thread
> *Date: *2017-02-21 10:08 (-0800)
> *List: *users@apex.apache.org
> <ht...@apex.apache.org>
>
> Please note that tuples should not be emitted by any thread other than the
> main operator thread.
>
> A common pattern is to use a thread-safe queue and have worker threads
> enqueue
> tuples there; the main operator thread then pulls tuples from the queue and
> emits them.
>
> Ram
>
> _______________________________________________________
>
> Munagala V. Ramanath
>
> Software Engineer
>
> E: ram@datatorrent.com | M: (408) 331-5034 | Twitter: @UnknownRam
> www.datatorrent.com  |  apex.apache.org
>
>
> From: Sunil Parmar <sp...@threatmetrix.com>
> Date: Tuesday, February 21, 2017 at 10:05 AM
> To: "users@apex.apache.org" <us...@apex.apache.org>, "dev@apex.apache.org"
> <de...@apex.apache.org>
> Cc: Allan De Leon <ad...@threatmetrix.com>, Tim Zhu <
> tzhu@threatmetrix.com>
> Subject: Occasional Out of order tuples when emitting from a thread
>
> Hi there,
> We have the following setup:
>
>    - we have a generic operator that’s processing tuples in its input port
>    - in the input port’s process method, we check for a condition, and:
>       - if the condition is met, the tuple is emitted to the next
>       operator right away (in the process method)
>       - Otherwise, if the condition is not met, we store the tuple  in
>       some cache and we use some threads that periodically check the condition to
>       become true. Once the condition is true, the threads call the emit method
>       on the stored tuples.
>
> With this setup, we occasionally encounter the following error:
> 2017-02-15 17:29:09,364 ERROR com.datatorrent.stram.engine.GenericNode:
> Catastrophic Error: Out of sequence BEGIN_WINDOW tuple 58a4046100003b7f on
> port transformedJSON while expecting 58a4046100003b7e
>
> Is there a way to make the above work correctly?
> If not, can you recommend a better way of doing this?
> How can we ensure window assignment is done synchronously before emitting
> tuples ?
>
> Thanks very much in advance…
> -allan
>



-- 

Regards,
Ashwin.

Re: Occasional Out of order tuples when emitting from a thread

Posted by Sunil Parmar <sp...@threatmetrix.com>.
Ram,
Thanks for the prompt response. If we use the approach you suggested we're dependent on main thread's process call I.e. Tuples in the thread safe queue gets only processed when main thread is processing incoming tuples. How can we explicitly call the process from polling of delay queue ?

Just for reference here's the sample code snippet for our operator.


public class MyOperator extends BaseOperator implements

        Operator.ActivationListener<Context.OperatorContext> {

.....


@InputPortFieldAnnotation

    public transient DefaultInputPort<String> kafkaStreamInput =

            new DefaultInputPort<String>() {

        List<String> errors = new ArrayList<String>();

        @Override

        public void process(String consumerRecord) {

//Code for normal tuple process

//Code to poll thread safe queue

}

-------------------------------------
From: Munagala Ramanath <ra...@datatorrent.com>>
To: users@apex.apache.org<ma...@apex.apache.org>
CC: "dev@apex.apache.org<ma...@apex.apache.org>" <de...@apex.apache.org>>, Allan De Leon <ad...@threatmetrix.com>>, Tim Zhu <tz...@threatmetrix.com>>
Subject: Re: Occasional Out of order tuples when emitting from a thread
Date: 2017-02-21 10:08 (-0800)
List: users@apex.apache.org<ht...@apex.apache.org>

Please note that tuples should not be emitted by any thread other than the
main operator thread.

A common pattern is to use a thread-safe queue and have worker threads
enqueue
tuples there; the main operator thread then pulls tuples from the queue and
emits them.

Ram

_______________________________________________________

Munagala V. Ramanath

Software Engineer

E: ram@datatorrent.com<ma...@datatorrent.com> | M: (408) 331-5034 | Twitter: @UnknownRam

www.datatorrent.com  |  apex.apache.org

From: Sunil Parmar <sp...@threatmetrix.com>>
Date: Tuesday, February 21, 2017 at 10:05 AM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>, "dev@apex.apache.org<ma...@apex.apache.org>" <de...@apex.apache.org>>
Cc: Allan De Leon <ad...@threatmetrix.com>>, Tim Zhu <tz...@threatmetrix.com>>
Subject: Occasional Out of order tuples when emitting from a thread

Hi there,
We have the following setup:

  *   we have a generic operator that's processing tuples in its input port
  *   in the input port's process method, we check for a condition, and:
     *   if the condition is met, the tuple is emitted to the next operator right away (in the process method)
     *   Otherwise, if the condition is not met, we store the tuple  in some cache and we use some threads that periodically check the condition to become true. Once the condition is true, the threads call the emit method on the stored tuples.

With this setup, we occasionally encounter the following error:
2017-02-15 17:29:09,364 ERROR com.datatorrent.stram.engine.GenericNode: Catastrophic Error: Out of sequence BEGIN_WINDOW tuple 58a4046100003b7f on port transformedJSON while expecting 58a4046100003b7e

Is there a way to make the above work correctly?
If not, can you recommend a better way of doing this?
How can we ensure window assignment is done synchronously before emitting tuples ?

Thanks very much in advance...
-allan

Re: Occasional Out of order tuples when emitting from a thread

Posted by Sunil Parmar <sp...@threatmetrix.com>.
Ram,
Thanks for the prompt response. If we use the approach you suggested we're dependent on main thread's process call I.e. Tuples in the thread safe queue gets only processed when main thread is processing incoming tuples. How can we explicitly call the process from polling of delay queue ?

Just for reference here's the sample code snippet for our operator.


public class MyOperator extends BaseOperator implements

        Operator.ActivationListener<Context.OperatorContext> {

.....


@InputPortFieldAnnotation

    public transient DefaultInputPort<String> kafkaStreamInput =

            new DefaultInputPort<String>() {

        List<String> errors = new ArrayList<String>();

        @Override

        public void process(String consumerRecord) {

//Code for normal tuple process

//Code to poll thread safe queue

}

-------------------------------------
From: Munagala Ramanath <ra...@datatorrent.com>>
To: users@apex.apache.org<ma...@apex.apache.org>
CC: "dev@apex.apache.org<ma...@apex.apache.org>" <de...@apex.apache.org>>, Allan De Leon <ad...@threatmetrix.com>>, Tim Zhu <tz...@threatmetrix.com>>
Subject: Re: Occasional Out of order tuples when emitting from a thread
Date: 2017-02-21 10:08 (-0800)
List: users@apex.apache.org<ht...@apex.apache.org>

Please note that tuples should not be emitted by any thread other than the
main operator thread.

A common pattern is to use a thread-safe queue and have worker threads
enqueue
tuples there; the main operator thread then pulls tuples from the queue and
emits them.

Ram

_______________________________________________________

Munagala V. Ramanath

Software Engineer

E: ram@datatorrent.com<ma...@datatorrent.com> | M: (408) 331-5034 | Twitter: @UnknownRam

www.datatorrent.com  |  apex.apache.org

From: Sunil Parmar <sp...@threatmetrix.com>>
Date: Tuesday, February 21, 2017 at 10:05 AM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>, "dev@apex.apache.org<ma...@apex.apache.org>" <de...@apex.apache.org>>
Cc: Allan De Leon <ad...@threatmetrix.com>>, Tim Zhu <tz...@threatmetrix.com>>
Subject: Occasional Out of order tuples when emitting from a thread

Hi there,
We have the following setup:

  *   we have a generic operator that's processing tuples in its input port
  *   in the input port's process method, we check for a condition, and:
     *   if the condition is met, the tuple is emitted to the next operator right away (in the process method)
     *   Otherwise, if the condition is not met, we store the tuple  in some cache and we use some threads that periodically check the condition to become true. Once the condition is true, the threads call the emit method on the stored tuples.

With this setup, we occasionally encounter the following error:
2017-02-15 17:29:09,364 ERROR com.datatorrent.stram.engine.GenericNode: Catastrophic Error: Out of sequence BEGIN_WINDOW tuple 58a4046100003b7f on port transformedJSON while expecting 58a4046100003b7e

Is there a way to make the above work correctly?
If not, can you recommend a better way of doing this?
How can we ensure window assignment is done synchronously before emitting tuples ?

Thanks very much in advance...
-allan

Re: Occasional Out of order tuples when emitting from a thread

Posted by Munagala Ramanath <ra...@datatorrent.com>.
Please note that tuples should not be emitted by any thread other than the
main operator thread.

A common pattern is to use a thread-safe queue and have worker threads
enqueue
tuples there; the main operator thread then pulls tuples from the queue and
emits them.

Ram

On Tue, Feb 21, 2017 at 10:05 AM, Sunil Parmar <sp...@threatmetrix.com>
wrote:

> Hi there,
> We have the following setup:
>
>    - we have a generic operator that’s processing tuples in its input port
>    - in the input port’s process method, we check for a condition, and:
>       - if the condition is met, the tuple is emitted to the next
>       operator right away (in the process method)
>       - Otherwise, if the condition is not met, we store the tuple  in
>       some cache and we use some threads that periodically check the condition to
>       become true. Once the condition is true, the threads call the emit method
>       on the stored tuples.
>
> With this setup, we occasionally encounter the following error:
> 2017-02-15 17:29:09,364 ERROR com.datatorrent.stram.engine.GenericNode:
> Catastrophic Error: Out of sequence BEGIN_WINDOW tuple 58a4046100003b7f on
> port transformedJSON while expecting 58a4046100003b7e
>
> Is there a way to make the above work correctly?
> If not, can you recommend a better way of doing this?
> How can we ensure window assignment is done synchronously before emitting
> tuples ?
>
> Thanks very much in advance…
> -allan
>



-- 

_______________________________________________________

Munagala V. Ramanath

Software Engineer

E: ram@datatorrent.com | M: (408) 331-5034 | Twitter: @UnknownRam

www.datatorrent.com  |  apex.apache.org