You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Jagadish Venkatraman <ja...@gmail.com> on 2016/08/24 21:03:50 UTC

Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/
-----------------------------------------------------------

(Updated Aug. 24, 2016, 9:03 p.m.)


Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.


Changes
-------

- Prevent future polls for Ssps at end of stream.
- Support the notion of endOfStream when iterating over messages in an SSP using a SystemStreamPartitionIterator


Repository: samza


Description
-------

Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 

This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  

RB changes:
- New interface for EndOfStreamListener
- New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
- Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.

Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf


Diffs (updated)
-----

  samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
  samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
  samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296 
  samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
  samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala a8355b944cad54faacf5eeb883d8f4b630440757 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 

Diff: https://reviews.apache.org/r/51346/diff/


Testing
-------

Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.


Thanks,

Jagadish Venkatraman


Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

Posted by Jagadish Venkatraman <ja...@gmail.com>.

> On Aug. 25, 2016, 6:13 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 306
> > <https://reviews.apache.org/r/51346/diff/2/?file=1485172#file1485172line306>
> >
> >     I think we only need the size(). Is the underlying data structure mutable?

I'd expect that having a counter would work. (even with a rogue consumer that always keeps returning a List<end of stream> messages because you would not `update` the `Chooser` but I wanted to be defensive). The new draft uses a single pendingSspSet. I'll give it some more thought later and see if we can change it.


- Jagadish


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/#review146813
-----------------------------------------------------------


On Aug. 25, 2016, 11:43 p.m., Jagadish Venkatraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51346/
> -----------------------------------------------------------
> 
> (Updated Aug. 25, 2016, 11:43 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 
> 
> This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  
> 
> RB changes:
> - New interface for EndOfStreamListener
> - New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
> - Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.
> 
> Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
>   samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
>   samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 
> 
> Diff: https://reviews.apache.org/r/51346/diff/
> 
> 
> Testing
> -------
> 
> Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>


Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

Posted by Jagadish Venkatraman <ja...@gmail.com>.

> On Aug. 25, 2016, 6:13 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, line 140
> > <https://reviews.apache.org/r/51346/diff/2/?file=1485174#file1485174line140>
> >
> >     Minor: Add documentation about what this is.

All changes to SystemConsumers have been reverted - since, we guarantee that there will be no subsequent poll() call from the endOfStream Offset.


> On Aug. 25, 2016, 6:13 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala, line 62
> > <https://reviews.apache.org/r/51346/diff/2/?file=1485173#file1485173line62>
> >
> >     Minor: isEndOfStreamListenerTask.
> >     
> >     Will make the usage at AsyncRunLoop:363 clearer.

Fixed - thanks for the careful review! :)


> On Aug. 25, 2016, 6:13 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 567
> > <https://reviews.apache.org/r/51346/diff/2/?file=1485172#file1485172line567>
> >
> >     Minor: move inside if block.

Fixed, thanks!


> On Aug. 25, 2016, 6:13 p.m., Prateek Maheshwari wrote:
> > samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java, line 29
> > <https://reviews.apache.org/r/51346/diff/2/?file=1485169#file1485169line29>
> >
> >     Minor: s/is/should not be

Fixed. thanks for the review.


- Jagadish


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/#review146813
-----------------------------------------------------------


On Aug. 25, 2016, 11:43 p.m., Jagadish Venkatraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51346/
> -----------------------------------------------------------
> 
> (Updated Aug. 25, 2016, 11:43 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 
> 
> This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  
> 
> RB changes:
> - New interface for EndOfStreamListener
> - New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
> - Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.
> 
> Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
>   samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
>   samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 
> 
> Diff: https://reviews.apache.org/r/51346/diff/
> 
> 
> Testing
> -------
> 
> Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>


Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

Posted by Prateek Maheshwari <pm...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/#review146813
-----------------------------------------------------------




samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java (line 29)
<https://reviews.apache.org/r/51346/#comment213511>

    Minor: s/is/should not be



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 305)
<https://reviews.apache.org/r/51346/#comment213517>

    I think we only need the size(). Is the underlying data structure mutable?



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 542)
<https://reviews.apache.org/r/51346/#comment213530>

    Would a simple countdown from initial #SSPs be enough here? We shouldn't get more than one EOS message per ssp.



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 564)
<https://reviews.apache.org/r/51346/#comment213529>

    Minor: move inside if block.



samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala (line 62)
<https://reviews.apache.org/r/51346/#comment213532>

    Minor: isEndOfStreamListenerTask.
    
    Will make the usage at AsyncRunLoop:363 clearer.



samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala (line 140)
<https://reviews.apache.org/r/51346/#comment213538>

    Minor: Add documentation about what this is.



samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala (line 354)
<https://reviews.apache.org/r/51346/#comment213533>

    Just wondering, is 'checkX' our naming convention for methods with a check + side effects?


- Prateek Maheshwari


On Aug. 24, 2016, 2:03 p.m., Jagadish Venkatraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51346/
> -----------------------------------------------------------
> 
> (Updated Aug. 24, 2016, 2:03 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 
> 
> This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  
> 
> RB changes:
> - New interface for EndOfStreamListener
> - New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
> - Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.
> 
> Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
>   samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
>   samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala a8355b944cad54faacf5eeb883d8f4b630440757 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 
> 
> Diff: https://reviews.apache.org/r/51346/diff/
> 
> 
> Testing
> -------
> 
> Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>


Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

Posted by Xinyu Liu <xi...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/#review146712
-----------------------------------------------------------




samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 350)
<https://reviews.apache.org/r/51346/#comment213290>

    remove



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 558)
<https://reviews.apache.org/r/51346/#comment213287>

    boolean isEndOfStream() {
      ...
      return ssps.isEmtpy() && pendingEvelopQueue.isEmpty() && messagesInFilght == 0);
    }



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 580)
<https://reviews.apache.org/r/51346/#comment213288>

    endOfStrem=isEndOfStream();



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 605)
<https://reviews.apache.org/r/51346/#comment213289>

    if(endOfStream) ...



samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala (line 355)
<https://reviews.apache.org/r/51346/#comment213294>

    No need for this logic since AsyncRunLoop won't poll anymore.


- Xinyu Liu


On Aug. 24, 2016, 9:03 p.m., Jagadish Venkatraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51346/
> -----------------------------------------------------------
> 
> (Updated Aug. 24, 2016, 9:03 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 
> 
> This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  
> 
> RB changes:
> - New interface for EndOfStreamListener
> - New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
> - Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.
> 
> Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
>   samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
>   samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala a8355b944cad54faacf5eeb883d8f4b630440757 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 
> 
> Diff: https://reviews.apache.org/r/51346/diff/
> 
> 
> Testing
> -------
> 
> Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>


Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

Posted by Jagadish Venkatraman <ja...@gmail.com>.

> On Aug. 26, 2016, 5:06 p.m., Xinyu Liu wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 540
> > <https://reviews.apache.org/r/51346/diff/4/?file=1486378#file1486378line540>
> >
> >     I think it might be cleaner to have two flags, one indicating endOfStream, the other indicating the task is complete. So we can directly set the endOfStream flags based on empty ssps. And the other flag can be set from the taskWorker. Let's chat offline.

I've addressed this feedback in the revised RB.


- Jagadish


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/#review146982
-----------------------------------------------------------


On Aug. 29, 2016, 11:47 p.m., Jagadish Venkatraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51346/
> -----------------------------------------------------------
> 
> (Updated Aug. 29, 2016, 11:47 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 
> 
> This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  
> 
> RB changes:
> - New interface for EndOfStreamListener
> - New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
> - Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.
> 
> Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf
> 
> 
> Diffs
> -----
> 
>   build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
>   samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
>   samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
>   samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 1fc645673b7547b642830df5639c0b4fcd11c0d5 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 
> 
> Diff: https://reviews.apache.org/r/51346/diff/
> 
> 
> Testing
> -------
> 
> Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>


Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

Posted by Xinyu Liu <xi...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/#review146982
-----------------------------------------------------------




samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 537)
<https://reviews.apache.org/r/51346/#comment213925>

    I think it might be cleaner to have two flags, one indicating endOfStream, the other indicating the task is complete. So we can directly set the endOfStream flags based on empty ssps. And the other flag can be set from the taskWorker. Let's chat offline.



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 541)
<https://reviews.apache.org/r/51346/#comment213926>

    I understand what you mean here but the naming may be a little confusing. sspsToProcess or sspsInProgress? Pleaes come up some name having a clear meaning.



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 612)
<https://reviews.apache.org/r/51346/#comment213927>

    I have some idea to integrate this logic inside isReady(), since this is pretty much the condition for endOfStream. Let's chat offline.


- Xinyu Liu


On Aug. 25, 2016, 11:52 p.m., Jagadish Venkatraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51346/
> -----------------------------------------------------------
> 
> (Updated Aug. 25, 2016, 11:52 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 
> 
> This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  
> 
> RB changes:
> - New interface for EndOfStreamListener
> - New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
> - Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.
> 
> Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
>   samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
>   samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 
> 
> Diff: https://reviews.apache.org/r/51346/diff/
> 
> 
> Testing
> -------
> 
> Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>


Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

Posted by Jagadish Venkatraman <ja...@gmail.com>.

> On Aug. 29, 2016, 4:19 p.m., Jake Maes wrote:
> > samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java, line 79
> > <https://reviews.apache.org/r/51346/diff/4/?file=1486376#file1486376line79>
> >
> >     Would it be better to move this method to IncomingMessageEnvelope?
> >     
> >     Then END_OF_STREAM_OFFSET could be made private and would be an interchangeable implementation detail. 
> >     
> >     It also means this END_OF_STREAM_OFFSET.equals(...) bit wouldn't need to be repeated in AsyncRunLoop.

Thanks for the input! I 've fixed this as you suggested.

I did not have a strong opinion on this when I was implementing it because - Now, users could invoke isEndOfStream() on the envelope (when they should not actually care about end-of-stream messages that are purely internal to Samza). A reasonable alternative is to make isEndOfStream() package private, however, it's used at multiple places outside the API package.


- Jagadish


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/#review147135
-----------------------------------------------------------


On Aug. 25, 2016, 11:52 p.m., Jagadish Venkatraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51346/
> -----------------------------------------------------------
> 
> (Updated Aug. 25, 2016, 11:52 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 
> 
> This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  
> 
> RB changes:
> - New interface for EndOfStreamListener
> - New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
> - Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.
> 
> Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
>   samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
>   samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 
> 
> Diff: https://reviews.apache.org/r/51346/diff/
> 
> 
> Testing
> -------
> 
> Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>


Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

Posted by Jake Maes <ja...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/#review147135
-----------------------------------------------------------




samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java (line 79)
<https://reviews.apache.org/r/51346/#comment214244>

    Would it be better to move this method to IncomingMessageEnvelope?
    
    Then END_OF_STREAM_OFFSET could be made private and would be an interchangeable implementation detail. 
    
    It also means this END_OF_STREAM_OFFSET.equals(...) bit wouldn't need to be repeated in AsyncRunLoop.


- Jake Maes


On Aug. 25, 2016, 11:52 p.m., Jagadish Venkatraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51346/
> -----------------------------------------------------------
> 
> (Updated Aug. 25, 2016, 11:52 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 
> 
> This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  
> 
> RB changes:
> - New interface for EndOfStreamListener
> - New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
> - Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.
> 
> Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
>   samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
>   samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 
> 
> Diff: https://reviews.apache.org/r/51346/diff/
> 
> 
> Testing
> -------
> 
> Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>


Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

Posted by Jagadish Venkatraman <ja...@gmail.com>.

> On Aug. 29, 2016, 11:54 p.m., Xinyu Liu wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 375
> > <https://reviews.apache.org/r/51346/diff/5/?file=1488333#file1488333line375>
> >
> >     No need to resume. This is running on the main thread.

Not calling resume() will stall the runLoop, causing an infinite wait. I verified this using an unit test.


> On Aug. 29, 2016, 11:54 p.m., Xinyu Liu wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 370
> > <https://reviews.apache.org/r/51346/diff/5/?file=1488333#file1488333line370>
> >
> >     Do we need to consider the auto-commit case here?

Talked with Xinyu and Yi offline. We agreed that auto-commit is nothing more than a periodic commit. It does not provide any guarantees that we'll commit at the end-of-stream. Users requiring end-of-stream commit should invoke `commit` in the end-of-stream-listener callback.


- Jagadish


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/#review147245
-----------------------------------------------------------


On Aug. 30, 2016, 12:32 a.m., Jagadish Venkatraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51346/
> -----------------------------------------------------------
> 
> (Updated Aug. 30, 2016, 12:32 a.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 
> 
> This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  
> 
> RB changes:
> - New interface for EndOfStreamListener
> - New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
> - Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.
> 
> Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf
> 
> 
> Diffs
> -----
> 
>   build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
>   samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
>   samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
>   samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 1fc645673b7547b642830df5639c0b4fcd11c0d5 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 
> 
> Diff: https://reviews.apache.org/r/51346/diff/
> 
> 
> Testing
> -------
> 
> Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>


Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

Posted by Xinyu Liu <xi...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/#review147245
-----------------------------------------------------------


Fix it, then Ship it!




Generally looks great! Thanks a for cleaning the logic up! A few comments below.


samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 369)
<https://reviews.apache.org/r/51346/#comment214356>

    Do we need to consider the auto-commit case here?



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 374)
<https://reviews.apache.org/r/51346/#comment214357>

    No need to resume. This is running on the main thread.



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 610)
<https://reviews.apache.org/r/51346/#comment214358>

    The function can be removed since no invoke anymore, right?


- Xinyu Liu


On Aug. 29, 2016, 11:47 p.m., Jagadish Venkatraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51346/
> -----------------------------------------------------------
> 
> (Updated Aug. 29, 2016, 11:47 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 
> 
> This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  
> 
> RB changes:
> - New interface for EndOfStreamListener
> - New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
> - Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.
> 
> Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf
> 
> 
> Diffs
> -----
> 
>   build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
>   samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
>   samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
>   samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 1fc645673b7547b642830df5639c0b4fcd11c0d5 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 
> 
> Diff: https://reviews.apache.org/r/51346/diff/
> 
> 
> Testing
> -------
> 
> Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>


Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

Posted by Jake Maes <ja...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/#review147248
-----------------------------------------------------------




samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java (line 79)
<https://reviews.apache.org/r/51346/#comment214359>

    Ahh, good point. I actually didn't see that you were trying to avoid adding internal details to the API. That's a good goal, but one could argue that the public constant is also public and even more confusing to users (as they may not know what it's for). 
    
    So, I suggest every aspect of endOfStream should be hidden, or it should be encapsulated as much as possible to minimize the damage.


- Jake Maes


On Aug. 29, 2016, 11:47 p.m., Jagadish Venkatraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51346/
> -----------------------------------------------------------
> 
> (Updated Aug. 29, 2016, 11:47 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 
> 
> This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  
> 
> RB changes:
> - New interface for EndOfStreamListener
> - New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
> - Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.
> 
> Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf
> 
> 
> Diffs
> -----
> 
>   build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
>   samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
>   samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
>   samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 1fc645673b7547b642830df5639c0b4fcd11c0d5 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 
> 
> Diff: https://reviews.apache.org/r/51346/diff/
> 
> 
> Testing
> -------
> 
> Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>


Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

Posted by Jagadish Venkatraman <ja...@gmail.com>.

> On Sept. 8, 2016, 6:24 p.m., Xinyu Liu wrote:
> > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, line 210
> > <https://reviews.apache.org/r/51346/diff/7/?file=1491978#file1491978line210>
> >
> >     nit: no ";" at the end for scala

fixed


- Jagadish


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/#review148229
-----------------------------------------------------------


On Sept. 12, 2016, 9:25 p.m., Jagadish Venkatraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51346/
> -----------------------------------------------------------
> 
> (Updated Sept. 12, 2016, 9:25 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 
> 
> This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  
> 
> RB changes:
> - New interface for EndOfStreamListener
> - New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
> - Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.
> 
> Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf
> 
> This RB depends on https://reviews.apache.org/r/51819/ and https://reviews.apache.org/r/51824/ (to be committed first)
> 
> 
> Diffs
> -----
> 
>   build.gradle 004c81e9173c22dee3e282aad2a58a34a3e46fe2 
>   checkstyle/checkstyle.xml 770b5e7f7a091198bbf53b3908600f9ac0caa4c7 
>   checkstyle/import-control.xml 7e77702bcd5c32f7fdaf1558337505993c1abe06 
>   gradle.properties 16e1f5d43f0415c511689480f8cb67d84e2baadf 
>   samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
>   samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
>   samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java 1fed2fb52eaaa27aed2bb79db55172d6f799506d 
>   samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java 39897c76ba8bb123e2ce6a878a891c17d75b99d6 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 1fc645673b7547b642830df5639c0b4fcd11c0d5 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala a8355b944cad54faacf5eeb883d8f4b630440757 
>   samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java 57a5da6120d1fecc6b5f14db03123813a571f9b9 
>   samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java 9661885c1278c0d1cf53aae61f502ae719adfd51 
>   samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java d813fdcd17b963321df4a57708710c9e5a20aa92 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 
>   samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala db2249ba73526ae1ca0a03eb5e497f2fe5cfed9c 
> 
> Diff: https://reviews.apache.org/r/51346/diff/
> 
> 
> Testing
> -------
> 
> Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>


Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

Posted by Xinyu Liu <xi...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/#review148229
-----------------------------------------------------------




samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala (line 207)
<https://reviews.apache.org/r/51346/#comment215701>

    nit: no ";" at the end for scala


- Xinyu Liu


On Sept. 6, 2016, 4:02 p.m., Jagadish Venkatraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51346/
> -----------------------------------------------------------
> 
> (Updated Sept. 6, 2016, 4:02 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 
> 
> This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  
> 
> RB changes:
> - New interface for EndOfStreamListener
> - New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
> - Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.
> 
> Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf
> 
> 
> Diffs
> -----
> 
>   build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
>   checkstyle/import-control.xml 7e77702bcd5c32f7fdaf1558337505993c1abe06 
>   samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
>   samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
>   samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 1fc645673b7547b642830df5639c0b4fcd11c0d5 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala a8355b944cad54faacf5eeb883d8f4b630440757 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 
> 
> Diff: https://reviews.apache.org/r/51346/diff/
> 
> 
> Testing
> -------
> 
> Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>


Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

Posted by Jagadish Venkatraman <ja...@gmail.com>.

> On Sept. 8, 2016, 5:03 p.m., Xinyu Liu wrote:
> > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, line 43
> > <https://reviews.apache.org/r/51346/diff/7/?file=1491978#file1491978line43>
> >
> >     seems this is only needed for unit test. Please move the code to scala test.

I've removed it. But in the process, I found that our scala compilation settings in gradle were broken. I have another patch SAMZA-1016 (https://reviews.apache.org/r/51824/) that fixes this.


> On Sept. 8, 2016, 5:03 p.m., Xinyu Liu wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 341
> > <https://reviews.apache.org/r/51346/diff/6-7/?file=1488341#file1488341line341>
> >
> >     I think we should use java8 stream here, something like allPartitions.stream().filter(p->!consumerMultiplexer.isEndOfStream(p)).toSet()

I've addressed your comment. 

But in the process, I found that our checkstyle settings for JDK 8 were broken. (This meant that the builds will fail when using JDK-8 features like streams, lambdas etc. I have a patch SAMZA-1015 (https://reviews.apache.org/r/51819/) that fixes this.


- Jagadish


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/#review148210
-----------------------------------------------------------


On Sept. 12, 2016, 9:25 p.m., Jagadish Venkatraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51346/
> -----------------------------------------------------------
> 
> (Updated Sept. 12, 2016, 9:25 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 
> 
> This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  
> 
> RB changes:
> - New interface for EndOfStreamListener
> - New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
> - Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.
> 
> Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf
> 
> This RB depends on https://reviews.apache.org/r/51819/ and https://reviews.apache.org/r/51824/ (to be committed first)
> 
> 
> Diffs
> -----
> 
>   build.gradle 004c81e9173c22dee3e282aad2a58a34a3e46fe2 
>   checkstyle/checkstyle.xml 770b5e7f7a091198bbf53b3908600f9ac0caa4c7 
>   checkstyle/import-control.xml 7e77702bcd5c32f7fdaf1558337505993c1abe06 
>   gradle.properties 16e1f5d43f0415c511689480f8cb67d84e2baadf 
>   samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
>   samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
>   samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java 1fed2fb52eaaa27aed2bb79db55172d6f799506d 
>   samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java 39897c76ba8bb123e2ce6a878a891c17d75b99d6 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 1fc645673b7547b642830df5639c0b4fcd11c0d5 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala a8355b944cad54faacf5eeb883d8f4b630440757 
>   samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java 57a5da6120d1fecc6b5f14db03123813a571f9b9 
>   samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java 9661885c1278c0d1cf53aae61f502ae719adfd51 
>   samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java d813fdcd17b963321df4a57708710c9e5a20aa92 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 
>   samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala db2249ba73526ae1ca0a03eb5e497f2fe5cfed9c 
> 
> Diff: https://reviews.apache.org/r/51346/diff/
> 
> 
> Testing
> -------
> 
> Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>


Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

Posted by Xinyu Liu <xi...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/#review148210
-----------------------------------------------------------




samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 340)
<https://reviews.apache.org/r/51346/#comment215658>

    I think we should use java8 stream here, something like allPartitions.stream().filter(p->!consumerMultiplexer.isEndOfStream(p)).toSet()



samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala (line 41)
<https://reviews.apache.org/r/51346/#comment215659>

    seems this is only needed for unit test. Please move the code to scala test.


- Xinyu Liu


On Sept. 6, 2016, 4:02 p.m., Jagadish Venkatraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51346/
> -----------------------------------------------------------
> 
> (Updated Sept. 6, 2016, 4:02 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 
> 
> This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  
> 
> RB changes:
> - New interface for EndOfStreamListener
> - New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
> - Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.
> 
> Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf
> 
> 
> Diffs
> -----
> 
>   build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
>   checkstyle/import-control.xml 7e77702bcd5c32f7fdaf1558337505993c1abe06 
>   samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
>   samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
>   samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 1fc645673b7547b642830df5639c0b4fcd11c0d5 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala a8355b944cad54faacf5eeb883d8f4b630440757 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 
> 
> Diff: https://reviews.apache.org/r/51346/diff/
> 
> 
> Testing
> -------
> 
> Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>


Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

Posted by Jagadish Venkatraman <ja...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/
-----------------------------------------------------------

(Updated Oct. 1, 2016, 6:05 a.m.)


Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.


Changes
-------

rebased with latest.


Repository: samza


Description
-------

Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 

This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  

RB changes:
- New interface for EndOfStreamListener
- New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
- Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.

Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf

This RB depends on https://reviews.apache.org/r/51819/ and https://reviews.apache.org/r/51824/ (to be committed first)


Diffs (updated)
-----

  build.gradle ab257d3603c8f3ffefcd0772e0cdfd271ff8e77f 
  checkstyle/import-control.xml 7e77702bcd5c32f7fdaf1558337505993c1abe06 
  samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
  samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
  samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java 39897c76ba8bb123e2ce6a878a891c17d75b99d6 
  samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 9a21bf15a2d1a21ec3339c3a009a928d19d428df 
  samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 1fc645673b7547b642830df5639c0b4fcd11c0d5 
  samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
  samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala a8355b944cad54faacf5eeb883d8f4b630440757 
  samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java 57a5da6120d1fecc6b5f14db03123813a571f9b9 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 
  samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala db2249ba73526ae1ca0a03eb5e497f2fe5cfed9c 

Diff: https://reviews.apache.org/r/51346/diff/


Testing
-------

Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.


Thanks,

Jagadish Venkatraman


Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

Posted by Jagadish Venkatraman <ja...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/
-----------------------------------------------------------

(Updated Sept. 12, 2016, 9:25 p.m.)


Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.


Changes
-------

In the process of addressing [~xiliu]'s feedback, I found two gaps:
1. Our checkstyle checks are not compatible with JDK 8. (SAMZA-1015)
2. Our gradle settings are not setup correctly for joint compilation.(SAMZA-1016)


Repository: samza


Description (updated)
-------

Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 

This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  

RB changes:
- New interface for EndOfStreamListener
- New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
- Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.

Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf

This RB depends on https://reviews.apache.org/r/51819/ and https://reviews.apache.org/r/51824/ (to be committed first)


Diffs (updated)
-----

  build.gradle 004c81e9173c22dee3e282aad2a58a34a3e46fe2 
  checkstyle/checkstyle.xml 770b5e7f7a091198bbf53b3908600f9ac0caa4c7 
  checkstyle/import-control.xml 7e77702bcd5c32f7fdaf1558337505993c1abe06 
  gradle.properties 16e1f5d43f0415c511689480f8cb67d84e2baadf 
  samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
  samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
  samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java 1fed2fb52eaaa27aed2bb79db55172d6f799506d 
  samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java 39897c76ba8bb123e2ce6a878a891c17d75b99d6 
  samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296 
  samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 1fc645673b7547b642830df5639c0b4fcd11c0d5 
  samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
  samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala a8355b944cad54faacf5eeb883d8f4b630440757 
  samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java 57a5da6120d1fecc6b5f14db03123813a571f9b9 
  samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java 9661885c1278c0d1cf53aae61f502ae719adfd51 
  samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java d813fdcd17b963321df4a57708710c9e5a20aa92 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 
  samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala db2249ba73526ae1ca0a03eb5e497f2fe5cfed9c 

Diff: https://reviews.apache.org/r/51346/diff/


Testing
-------

Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.


Thanks,

Jagadish Venkatraman


Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

Posted by Jagadish Venkatraman <ja...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/
-----------------------------------------------------------

(Updated Sept. 6, 2016, 4:02 p.m.)


Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.


Repository: samza


Description
-------

Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 

This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  

RB changes:
- New interface for EndOfStreamListener
- New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
- Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.

Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf


Diffs (updated)
-----

  build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
  checkstyle/import-control.xml 7e77702bcd5c32f7fdaf1558337505993c1abe06 
  samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
  samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
  samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296 
  samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 1fc645673b7547b642830df5639c0b4fcd11c0d5 
  samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
  samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala a8355b944cad54faacf5eeb883d8f4b630440757 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 

Diff: https://reviews.apache.org/r/51346/diff/


Testing
-------

Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.


Thanks,

Jagadish Venkatraman


Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

Posted by Jagadish Venkatraman <ja...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/
-----------------------------------------------------------

(Updated Aug. 30, 2016, 12:32 a.m.)


Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.


Repository: samza


Description
-------

Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 

This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  

RB changes:
- New interface for EndOfStreamListener
- New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
- Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.

Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf


Diffs (updated)
-----

  build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
  samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
  samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
  samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296 
  samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 1fc645673b7547b642830df5639c0b4fcd11c0d5 
  samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 

Diff: https://reviews.apache.org/r/51346/diff/


Testing
-------

Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.


Thanks,

Jagadish Venkatraman


Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

Posted by Jagadish Venkatraman <ja...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/
-----------------------------------------------------------

(Updated Aug. 29, 2016, 11:47 p.m.)


Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.


Changes
-------

- Address Chriss, Xinyu and Jake's review feedback.


Repository: samza


Description
-------

Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 

This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  

RB changes:
- New interface for EndOfStreamListener
- New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
- Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.

Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf


Diffs (updated)
-----

  build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
  samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
  samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
  samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296 
  samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 1fc645673b7547b642830df5639c0b4fcd11c0d5 
  samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 

Diff: https://reviews.apache.org/r/51346/diff/


Testing
-------

Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.


Thanks,

Jagadish Venkatraman


Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

Posted by Jagadish Venkatraman <ja...@gmail.com>.

> On Aug. 26, 2016, 7:51 p.m., Chris Pettitt wrote:
> > samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java, line 31
> > <https://reviews.apache.org/r/51346/diff/4/?file=1486375#file1486375line31>
> >
> >     How likely are we to collide with this? Thats the problem with using a user definable token. I see two options:
> >     
> >     If null is not supported (and thus not usable by user-defined implementations) I would use that and mark it as reserved.
> >     
> >     Otherwise I would probably do something more to make this unlikely to collide (call me paranoid). Something like use a NUL byte as the first character and document that offsets with such an encoding are reserved. I would also check that this sort of string doesn't make it to user code in the task.
> 
> Jagadish Venkatraman wrote:
>     Returning a null is not possible (because a null offset could mean that we don't have messages at this moment instead of meaning end-of-stream. While we should poll again when a consumer returns null, we should not for the END_OF_STREAM case.) Hence, I was hoping to use a special offset.
>     
>     I like your suggestion of using a NUL byte as the first character (and calling that out). I'll update the RB with that.
> 
> Jagadish Venkatraman wrote:
>     There seem to be inter-operability versions of strings in Java vs strings in scala (esp - around handling NUL bytes in the string- Scala appears to strip out NUL bytes). Hence, I've used a "SAMZA_INTERNAL_END_OF_STREAM" as a string. Let me know if there's a better way to handle this.
> 
> Chris Pettitt wrote:
>     Scala should definitely not be dropping any bytes. How could it safely do so?
>     
>     FWIW, you can verify:
>     
>     ```
>     % scala
>     scala> "\u0000".length
>     res0: Int = 1
>     ```

I managed to dig into the problem more. 

Case 1: `public static final String STR = "\0END_OF_STREAM";`

When this Java constant is accessed from scala code - it seems to be encoded as
STR[0]= 65533, STR[1] = 65533, STR[2]= 'E', STR[3]='N'... and so on. When accessed from Java, it 's correctly encoded with STR[0] = '\0'

Case 2 (current updated RB that solves this) :   `private static final byte[] END_OF_STREAM_BYTES = "\0END_OF_STREAM".getBytes();`
  `public static final String STR = new String(END_OF_STREAM_BYTES, Charset.defaultCharset());`
  
When `STR` is now accessed from scala code - it seems to be encoded (correctly) as 
STR[0] = '\0' STR[1]='E' STR[2]='N' STR[3]='D'..

While Case 2 fixes this, I wonder how portable the solution is. Do you have insight into what could go on with different encodings?


- Jagadish


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/#review147015
-----------------------------------------------------------


On Sept. 6, 2016, 4:02 p.m., Jagadish Venkatraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51346/
> -----------------------------------------------------------
> 
> (Updated Sept. 6, 2016, 4:02 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 
> 
> This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  
> 
> RB changes:
> - New interface for EndOfStreamListener
> - New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
> - Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.
> 
> Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf
> 
> 
> Diffs
> -----
> 
>   build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
>   checkstyle/import-control.xml 7e77702bcd5c32f7fdaf1558337505993c1abe06 
>   samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
>   samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
>   samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 1fc645673b7547b642830df5639c0b4fcd11c0d5 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala a8355b944cad54faacf5eeb883d8f4b630440757 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 
> 
> Diff: https://reviews.apache.org/r/51346/diff/
> 
> 
> Testing
> -------
> 
> Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>


Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

Posted by Jagadish Venkatraman <ja...@gmail.com>.

> On Aug. 26, 2016, 7:51 p.m., Chris Pettitt wrote:
> > samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java, line 31
> > <https://reviews.apache.org/r/51346/diff/4/?file=1486375#file1486375line31>
> >
> >     How likely are we to collide with this? Thats the problem with using a user definable token. I see two options:
> >     
> >     If null is not supported (and thus not usable by user-defined implementations) I would use that and mark it as reserved.
> >     
> >     Otherwise I would probably do something more to make this unlikely to collide (call me paranoid). Something like use a NUL byte as the first character and document that offsets with such an encoding are reserved. I would also check that this sort of string doesn't make it to user code in the task.

Returning a null is not possible (because a null offset could mean that we don't have messages at this moment instead of meaning end-of-stream. While we should poll again when a consumer returns null, we should not for the END_OF_STREAM case.) Hence, I was hoping to use a special offset.

I like your suggestion of using a NUL byte as the first character (and calling that out). I'll update the RB with that.


> On Aug. 26, 2016, 7:51 p.m., Chris Pettitt wrote:
> > samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java, line 60
> > <https://reviews.apache.org/r/51346/diff/4/?file=1486376#file1486376line60>
> >
> >     Shouldn't this case be flagged as an error at the point at which this is detected during enqueue and then dropped? The benefit is that we have less states to worry about at layers like these and we also get a nice stack trace where the error originated.

I'm not sure I completely understand the part about - "detecting during enqueue and dropping it". There's buffering being done at every next/hasNext call. So, an enqueue of an end-of-stream is not necessarily an error.

- I've refactored how the `endOfStreamReached` flag is manipulated in the new draft. Hopefully, this refactoring makes the handling of states simpler.


> On Aug. 26, 2016, 7:51 p.m., Chris Pettitt wrote:
> > samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java, line 90
> > <https://reviews.apache.org/r/51346/diff/4/?file=1486376#file1486376line90>
> >
> >     OK, I take back the part about a nice stack trace... You could at least log the consumer that gave you back an end of stream followed by a message.

I've logged that the end-of-stream gave back end-of-stream. Thanks for the suggestion.


> On Aug. 26, 2016, 7:51 p.m., Chris Pettitt wrote:
> > samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java, line 67
> > <https://reviews.apache.org/r/51346/diff/4/?file=1486376#file1486376line67>
> >
> >     Same comment as above.

I've refactored how the `endOfStreamReached` flag is manipulated in the new draft. Hopefully, this refactoring makes the handling of states simpler.


> On Aug. 26, 2016, 7:51 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, lines 364-366
> > <https://reviews.apache.org/r/51346/diff/4/?file=1486378#file1486378line364>
> >
> >     We're distributing this end of stream knowledge in a few places. It actually looks totally unnecessary here? I think you could always call task.endOfStream and let the task decide if it needs to do anything as a result.

Good observation! I've fixed this one in the new draft.


> On Aug. 26, 2016, 7:51 p.m., Chris Pettitt wrote:
> > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala, line 174
> > <https://reviews.apache.org/r/51346/diff/4/?file=1486379#file1486379line174>
> >
> >     I would just check if you can coerce here. I don't think adding a second representation of that state is buying us much (and it would be bad for it to diverge from the truth).

I tried to be consistent with the rest of the Samza code base (on how we do this for Windowable, Initable, Closeable and Async tasks as well). We log the type of the task in the `detailedString()` function (in the revised RB). So, there maybe value in having a separate function.

I'm not sure how the representation may diverge (as the flags are immutable and set during construction).


> On Aug. 26, 2016, 7:51 p.m., Chris Pettitt wrote:
> > samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java, line 363
> > <https://reviews.apache.org/r/51346/diff/4/?file=1486380#file1486380line363>
> >
> >     This line is a bit long, you could break it up like this, which arguably improves readability:
> >     
> >     ```
> >     when(consumerMultiplexer.choose(false))
> >         .thenReturn(envelope0)
> >         .thenReturn(envelope1)
> >         ...
> >     ```

This is good feedback on readability. Thanks for the input :)


> On Aug. 26, 2016, 7:51 p.m., Chris Pettitt wrote:
> > samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java, line 371
> > <https://reviews.apache.org/r/51346/diff/4/?file=1486380#file1486380line371>
> >
> >     Interesting - should end of stream actually count as a message?

The metric - `processes` measures how many times we invoked `process` on the `StreamTask` implementation. Since, we don't pass over the endOfStream message to user-code, we will not invoke `process` for those messages. End-Of-Stream messages will be purely internal to Samza.


- Jagadish


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/#review147015
-----------------------------------------------------------


On Aug. 25, 2016, 11:52 p.m., Jagadish Venkatraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51346/
> -----------------------------------------------------------
> 
> (Updated Aug. 25, 2016, 11:52 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 
> 
> This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  
> 
> RB changes:
> - New interface for EndOfStreamListener
> - New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
> - Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.
> 
> Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
>   samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
>   samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 
> 
> Diff: https://reviews.apache.org/r/51346/diff/
> 
> 
> Testing
> -------
> 
> Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>


Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

Posted by Chris Pettitt <cp...@linkedin.com>.

> On Aug. 26, 2016, 7:51 p.m., Chris Pettitt wrote:
> > samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java, line 31
> > <https://reviews.apache.org/r/51346/diff/4/?file=1486375#file1486375line31>
> >
> >     How likely are we to collide with this? Thats the problem with using a user definable token. I see two options:
> >     
> >     If null is not supported (and thus not usable by user-defined implementations) I would use that and mark it as reserved.
> >     
> >     Otherwise I would probably do something more to make this unlikely to collide (call me paranoid). Something like use a NUL byte as the first character and document that offsets with such an encoding are reserved. I would also check that this sort of string doesn't make it to user code in the task.
> 
> Jagadish Venkatraman wrote:
>     Returning a null is not possible (because a null offset could mean that we don't have messages at this moment instead of meaning end-of-stream. While we should poll again when a consumer returns null, we should not for the END_OF_STREAM case.) Hence, I was hoping to use a special offset.
>     
>     I like your suggestion of using a NUL byte as the first character (and calling that out). I'll update the RB with that.
> 
> Jagadish Venkatraman wrote:
>     There seem to be inter-operability versions of strings in Java vs strings in scala (esp - around handling NUL bytes in the string- Scala appears to strip out NUL bytes). Hence, I've used a "SAMZA_INTERNAL_END_OF_STREAM" as a string. Let me know if there's a better way to handle this.

Scala should definitely not be dropping any bytes. How could it safely do so?

FWIW, you can verify:

```
% scala
scala> "\u0000".length
res0: Int = 1
```


- Chris


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/#review147015
-----------------------------------------------------------


On Aug. 30, 2016, 12:32 a.m., Jagadish Venkatraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51346/
> -----------------------------------------------------------
> 
> (Updated Aug. 30, 2016, 12:32 a.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 
> 
> This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  
> 
> RB changes:
> - New interface for EndOfStreamListener
> - New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
> - Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.
> 
> Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf
> 
> 
> Diffs
> -----
> 
>   build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
>   samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
>   samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
>   samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 1fc645673b7547b642830df5639c0b4fcd11c0d5 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 
> 
> Diff: https://reviews.apache.org/r/51346/diff/
> 
> 
> Testing
> -------
> 
> Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>


Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

Posted by Jagadish Venkatraman <ja...@gmail.com>.

> On Aug. 26, 2016, 7:51 p.m., Chris Pettitt wrote:
> > samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java, line 31
> > <https://reviews.apache.org/r/51346/diff/4/?file=1486375#file1486375line31>
> >
> >     How likely are we to collide with this? Thats the problem with using a user definable token. I see two options:
> >     
> >     If null is not supported (and thus not usable by user-defined implementations) I would use that and mark it as reserved.
> >     
> >     Otherwise I would probably do something more to make this unlikely to collide (call me paranoid). Something like use a NUL byte as the first character and document that offsets with such an encoding are reserved. I would also check that this sort of string doesn't make it to user code in the task.
> 
> Jagadish Venkatraman wrote:
>     Returning a null is not possible (because a null offset could mean that we don't have messages at this moment instead of meaning end-of-stream. While we should poll again when a consumer returns null, we should not for the END_OF_STREAM case.) Hence, I was hoping to use a special offset.
>     
>     I like your suggestion of using a NUL byte as the first character (and calling that out). I'll update the RB with that.

There seem to be inter-operability versions of strings in Java vs strings in scala (esp - around handling NUL bytes in the string- Scala appears to strip out NUL bytes). Hence, I've used a "SAMZA_INTERNAL_END_OF_STREAM" as a string. Let me know if there's a better way to handle this.


- Jagadish


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/#review147015
-----------------------------------------------------------


On Aug. 30, 2016, 12:32 a.m., Jagadish Venkatraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51346/
> -----------------------------------------------------------
> 
> (Updated Aug. 30, 2016, 12:32 a.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 
> 
> This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  
> 
> RB changes:
> - New interface for EndOfStreamListener
> - New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
> - Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.
> 
> Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf
> 
> 
> Diffs
> -----
> 
>   build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
>   samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
>   samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
>   samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 1fc645673b7547b642830df5639c0b4fcd11c0d5 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 
> 
> Diff: https://reviews.apache.org/r/51346/diff/
> 
> 
> Testing
> -------
> 
> Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>


Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

Posted by Chris Pettitt <cp...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/#review147015
-----------------------------------------------------------


Fix it, then Ship it!





samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java (line 31)
<https://reviews.apache.org/r/51346/#comment213968>

    How likely are we to collide with this? Thats the problem with using a user definable token. I see two options:
    
    If null is not supported (and thus not usable by user-defined implementations) I would use that and mark it as reserved.
    
    Otherwise I would probably do something more to make this unlikely to collide (call me paranoid). Something like use a NUL byte as the first character and document that offsets with such an encoding are reserved. I would also check that this sort of string doesn't make it to user code in the task.



samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java (line 60)
<https://reviews.apache.org/r/51346/#comment213969>

    Shouldn't this case be flagged as an error at the point at which this is detected during enqueue and then dropped? The benefit is that we have less states to worry about at layers like these and we also get a nice stack trace where the error originated.



samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java (line 67)
<https://reviews.apache.org/r/51346/#comment213970>

    Same comment as above.



samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java (line 90)
<https://reviews.apache.org/r/51346/#comment213971>

    OK, I take back the part about a nice stack trace... You could at least log the consumer that gave you back an end of stream followed by a message.



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (lines 363 - 365)
<https://reviews.apache.org/r/51346/#comment213972>

    We're distributing this end of stream knowledge in a few places. It actually looks totally unnecessary here? I think you could always call task.endOfStream and let the task decide if it needs to do anything as a result.



samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala (line 174)
<https://reviews.apache.org/r/51346/#comment213973>

    I would just check if you can coerce here. I don't think adding a second representation of that state is buying us much (and it would be bad for it to diverge from the truth).



samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java (line 360)
<https://reviews.apache.org/r/51346/#comment213974>

    This line is a bit long, you could break it up like this, which arguably improves readability:
    
    ```
    when(consumerMultiplexer.choose(false))
        .thenReturn(envelope0)
        .thenReturn(envelope1)
        ...
    ```



samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java (line 368)
<https://reviews.apache.org/r/51346/#comment213975>

    Interesting - should end of stream actually count as a message?


- Chris Pettitt


On Aug. 25, 2016, 11:52 p.m., Jagadish Venkatraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51346/
> -----------------------------------------------------------
> 
> (Updated Aug. 25, 2016, 11:52 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 
> 
> This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  
> 
> RB changes:
> - New interface for EndOfStreamListener
> - New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
> - Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.
> 
> Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
>   samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
>   samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 
> 
> Diff: https://reviews.apache.org/r/51346/diff/
> 
> 
> Testing
> -------
> 
> Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>


Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

Posted by Jagadish Venkatraman <ja...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/
-----------------------------------------------------------

(Updated Aug. 25, 2016, 11:52 p.m.)


Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.


Changes
-------

-minor method-name refactoring


Repository: samza


Description
-------

Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 

This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  

RB changes:
- New interface for EndOfStreamListener
- New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
- Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.

Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf


Diffs (updated)
-----

  samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
  samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
  samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296 
  samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 

Diff: https://reviews.apache.org/r/51346/diff/


Testing
-------

Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.


Thanks,

Jagadish Venkatraman


Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

Posted by Jagadish Venkatraman <ja...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/
-----------------------------------------------------------

(Updated Aug. 25, 2016, 11:43 p.m.)


Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.


Changes
-------

- Addressed most of Xinyu and Prateek's review feedback.


Repository: samza


Description
-------

Samza currently works with unbounded data sources (kafka streams). However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'. 

This is a step towards realizing a 'finite' Samza job that terminates once data processing is complete.(as opposed to an infinite stream job that keeps running)  

RB changes:
- New interface for EndOfStreamListener
- New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: When end-of-stream is reached there are no buffered messages, no-callbacks are in-flight and no-window or commit call shall be in progress)
- Changes to allow clean shut-downs of the tasks/container/job for end-of-stream.

Design Doc and Implementation Notes: https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf


Diffs (updated)
-----

  samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
  samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java a8f858aa7e4f4ce436f450cf439fe1a102983c64 
  samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java a510bb0c5914c772438930d27f100b4d360c1296 
  samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 89f6857014489aba2db4129bc2e26dfec5b10652 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ca913dea79fecbcecdfd1010dc794318055c5764 

Diff: https://reviews.apache.org/r/51346/diff/


Testing
-------

Unit tests to test scenarios for inorder processing, out-of-order processing and commit semantics.


Thanks,

Jagadish Venkatraman