You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by David Yan <da...@datatorrent.com> on 2015/12/04 19:34:44 UTC

END_STREAM tuple at recovery

Hi all,

At recovery from a checkpoint, I see END_STREAM control tuple as the first
tuple sent from upstream.  That seems a little counter intuitive because a
stream is actually starting.  Can someone shed some light on this?

Thanks!

David

Re: END_STREAM tuple at recovery

Posted by David Yan <da...@datatorrent.com>.
I see. Thanks Chetan.
The functionality of the END_STREAM tuple does not seem to represent the
end of a stream. What was the reason behind the choice of this name?

David
On Dec 8, 2015 1:27 AM, "Chetan Narsude (cnarsude)" <cn...@cisco.com>
wrote:

> David,
>
>   When it emits END_STREAM, WindowIdActivatedReservoir is replaced with
> the reservoir which's active for all the (subsequent) windows. The logic
> you quoted below works hand in hand with the END_STREAM handling in the
> GenericNode. So during recovery, GenericNode will always see it as the
> first tuple but the operator itself or downstream Nodes will never see it.
>
> Not sure what you mean by:
> >Is it safe to treat the END_STREAM tuple just as RESET_TUPLE at recovery
> >from a checkpoint?
>
> ‹
> Chetan
>
>
>
> On 12/7/15, 5:37 PM, "David Yan" <da...@datatorrent.com> wrote:
>
> >I traced to this piece of code in WindowIdActivatedReservoir.java below.
> >Chetan, please let me know if you recall the idea behind this.  The commit
> >message of the EndStreamTuple constructor call says something about making
> >a recovery test pass.
> >
> >@Override
> >public Tuple sweep()
> >{
> >  Tuple t;
> >  while ((t = reservoir.sweep()) != null) {
> >    if (t.getType() == MessageType.BEGIN_WINDOW && t.getWindowId() >
> >windowId) {
> >      reservoir.setSink(sink);
> >      return (est = new EndStreamTuple(windowId));
> >    }
> >    reservoir.remove();
> >  }
> >
> >  return null;
> >}
> >
> >On Mon, Dec 7, 2015 at 3:02 PM, David Yan <da...@datatorrent.com> wrote:
> >
> >> Anybody have any idea?  Is it safe to treat the END_STREAM tuple just as
> >> RESET_TUPLE at recovery from a checkpoint?
> >>
> >> David
> >>
> >> On Fri, Dec 4, 2015 at 10:34 AM, David Yan <da...@datatorrent.com>
> >>wrote:
> >>
> >>> Hi all,
> >>>
> >>> At recovery from a checkpoint, I see END_STREAM control tuple as the
> >>> first tuple sent from upstream.  That seems a little counter intuitive
> >>> because a stream is actually starting.  Can someone shed some light on
> >>>this?
> >>>
> >>> Thanks!
> >>>
> >>> David
> >>>
> >>
> >>
>
>

Re: END_STREAM tuple at recovery

Posted by "Chetan Narsude (cnarsude)" <cn...@cisco.com>.
David,

  When it emits END_STREAM, WindowIdActivatedReservoir is replaced with
the reservoir which's active for all the (subsequent) windows. The logic
you quoted below works hand in hand with the END_STREAM handling in the
GenericNode. So during recovery, GenericNode will always see it as the
first tuple but the operator itself or downstream Nodes will never see it.

Not sure what you mean by:
>Is it safe to treat the END_STREAM tuple just as RESET_TUPLE at recovery
>from a checkpoint?

‹
Chetan



On 12/7/15, 5:37 PM, "David Yan" <da...@datatorrent.com> wrote:

>I traced to this piece of code in WindowIdActivatedReservoir.java below.
>Chetan, please let me know if you recall the idea behind this.  The commit
>message of the EndStreamTuple constructor call says something about making
>a recovery test pass.
>
>@Override
>public Tuple sweep()
>{
>  Tuple t;
>  while ((t = reservoir.sweep()) != null) {
>    if (t.getType() == MessageType.BEGIN_WINDOW && t.getWindowId() >
>windowId) {
>      reservoir.setSink(sink);
>      return (est = new EndStreamTuple(windowId));
>    }
>    reservoir.remove();
>  }
>
>  return null;
>}
>
>On Mon, Dec 7, 2015 at 3:02 PM, David Yan <da...@datatorrent.com> wrote:
>
>> Anybody have any idea?  Is it safe to treat the END_STREAM tuple just as
>> RESET_TUPLE at recovery from a checkpoint?
>>
>> David
>>
>> On Fri, Dec 4, 2015 at 10:34 AM, David Yan <da...@datatorrent.com>
>>wrote:
>>
>>> Hi all,
>>>
>>> At recovery from a checkpoint, I see END_STREAM control tuple as the
>>> first tuple sent from upstream.  That seems a little counter intuitive
>>> because a stream is actually starting.  Can someone shed some light on
>>>this?
>>>
>>> Thanks!
>>>
>>> David
>>>
>>
>>


Re: END_STREAM tuple at recovery

Posted by David Yan <da...@datatorrent.com>.
I traced to this piece of code in WindowIdActivatedReservoir.java below.
Chetan, please let me know if you recall the idea behind this.  The commit
message of the EndStreamTuple constructor call says something about making
a recovery test pass.

@Override
public Tuple sweep()
{
  Tuple t;
  while ((t = reservoir.sweep()) != null) {
    if (t.getType() == MessageType.BEGIN_WINDOW && t.getWindowId() >
windowId) {
      reservoir.setSink(sink);
      return (est = new EndStreamTuple(windowId));
    }
    reservoir.remove();
  }

  return null;
}

On Mon, Dec 7, 2015 at 3:02 PM, David Yan <da...@datatorrent.com> wrote:

> Anybody have any idea?  Is it safe to treat the END_STREAM tuple just as
> RESET_TUPLE at recovery from a checkpoint?
>
> David
>
> On Fri, Dec 4, 2015 at 10:34 AM, David Yan <da...@datatorrent.com> wrote:
>
>> Hi all,
>>
>> At recovery from a checkpoint, I see END_STREAM control tuple as the
>> first tuple sent from upstream.  That seems a little counter intuitive
>> because a stream is actually starting.  Can someone shed some light on this?
>>
>> Thanks!
>>
>> David
>>
>
>

Re: END_STREAM tuple at recovery

Posted by David Yan <da...@datatorrent.com>.
Anybody have any idea?  Is it safe to treat the END_STREAM tuple just as
RESET_TUPLE at recovery from a checkpoint?

David

On Fri, Dec 4, 2015 at 10:34 AM, David Yan <da...@datatorrent.com> wrote:

> Hi all,
>
> At recovery from a checkpoint, I see END_STREAM control tuple as the first
> tuple sent from upstream.  That seems a little counter intuitive because a
> stream is actually starting.  Can someone shed some light on this?
>
> Thanks!
>
> David
>