You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Jagadish Venkatraman <ja...@gmail.com> on 2016/07/18 16:29:34 UTC

Review Request 50143: Support the notion of 'end-of-stream' in Samza when consuming from finite sources

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/50143/
-----------------------------------------------------------

Review request for samza, Boris Shkolnik, Chris Pettitt, Fred Ji, Jake Maes, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.


Repository: samza


Description
-------

Samza currently works with unbounded data sources. However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'.
The following are the logical tasks:
1. SystemConsumer will indicate to Samza that the end of stream has been reached for an SSP. (by constructing an envelope with eof set to true)
2. Samza will shut down the task if all SSPs in the task are at end of stream.
3. Samza will provide a callback to the task so that it can perform cleanups/ commits once tasks are at end of stream.
4. Samza will shut down the container if all tasks in the container have been shut down.
5. Samza will ultimately shut down the job if all containers in the job have been shut down.

This is a step towards realizing a 'finite' Samza job that terminates (as opposed to an infinite stream job that keeps running) once data processing is complete.


=== This RB is an RFC for design feedback ====

TODO:
1. Add more unit tests
2. Verify behavior with multiple containers


Diffs
-----

  samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
  samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala d32a92976e43ca24033b48c91851ee706de7de6b 
  samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala e280daa9626757cb4d17c0c03eed923277230c3e 

Diff: https://reviews.apache.org/r/50143/diff/


Testing
-------

Added an unit test and verified that an End of stream message terminates the runloop.


Thanks,

Jagadish Venkatraman


Re: Review Request 50143: Support the notion of 'end-of-stream' in Samza when consuming from finite sources

Posted by Jagadish Venkatraman <ja...@gmail.com>.

> On July 18, 2016, 5:12 p.m., Xinyu Liu wrote:
> > samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java, line 16
> > <https://reviews.apache.org/r/50143/diff/1/?file=1445940#file1445940line16>
> >
> >     Is it always true that user will need to commit before shutting down the task (I don't see a use case that use will not commit in the end)? Do we really need this api?

Let's say a stream contains messages: [1,2,3,4, EOF]. Lets say that autocommit is turned off. In such a scenario, when EOF is delivered, don't you think you'll need to give the user the ability to control whether you commit or not? (Because with auto-commit off , my understanding was we don't invoke commit)


> On July 18, 2016, 5:12 p.m., Xinyu Liu wrote:
> > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala, line 85
> > <https://reviews.apache.org/r/50143/diff/1/?file=1445941#file1445941line85>
> >
> >     this is not thread-safe

Good point, this change does not consider multi-threading, I'll make this a concurrent data structure. Thanks for the observation!


- Jagadish


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/50143/#review142580
-----------------------------------------------------------


On July 18, 2016, 4:29 p.m., Jagadish Venkatraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/50143/
> -----------------------------------------------------------
> 
> (Updated July 18, 2016, 4:29 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Fred Ji, Jake Maes, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Samza currently works with unbounded data sources. However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'.
> The following are the logical tasks:
> 1. SystemConsumer will indicate to Samza that the end of stream has been reached for an SSP. (by constructing an envelope with eof set to true)
> 2. Samza will shut down the task if all SSPs in the task are at end of stream.
> 3. Samza will provide a callback to the task so that it can perform cleanups/ commits once tasks are at end of stream.
> 4. Samza will shut down the container if all tasks in the container have been shut down.
> 5. Samza will ultimately shut down the job if all containers in the job have been shut down.
> 
> This is a step towards realizing a 'finite' Samza job that terminates (as opposed to an infinite stream job that keeps running) once data processing is complete.
> 
> 
> === This RB is an RFC for design feedback ====
> 
> TODO:
> 1. Add more unit tests
> 2. Verify behavior with multiple containers
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
>   samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala d32a92976e43ca24033b48c91851ee706de7de6b 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala e280daa9626757cb4d17c0c03eed923277230c3e 
> 
> Diff: https://reviews.apache.org/r/50143/diff/
> 
> 
> Testing
> -------
> 
> Added an unit test and verified that an End of stream message terminates the runloop.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>


Re: Review Request 50143: Support the notion of 'end-of-stream' in Samza when consuming from finite sources

Posted by Xinyu Liu <xi...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/50143/#review142580
-----------------------------------------------------------




samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java (line 16)
<https://reviews.apache.org/r/50143/#comment208204>

    Is it always true that user will need to commit before shutting down the task (I don't see a use case that use will not commit in the end)? Do we really need this api?



samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala (line 78)
<https://reviews.apache.org/r/50143/#comment208202>

    this is not thread-safe



samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala (line 135)
<https://reviews.apache.org/r/50143/#comment208203>

    Please make this thread safe since we might have multiple threads for a task.


- Xinyu Liu


On July 18, 2016, 4:29 p.m., Jagadish Venkatraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/50143/
> -----------------------------------------------------------
> 
> (Updated July 18, 2016, 4:29 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Fred Ji, Jake Maes, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Samza currently works with unbounded data sources. However, for bounded data sources like HDFS files, snapshot files which are not infinite, we need a notion of 'end-of-stream'.
> The following are the logical tasks:
> 1. SystemConsumer will indicate to Samza that the end of stream has been reached for an SSP. (by constructing an envelope with eof set to true)
> 2. Samza will shut down the task if all SSPs in the task are at end of stream.
> 3. Samza will provide a callback to the task so that it can perform cleanups/ commits once tasks are at end of stream.
> 4. Samza will shut down the container if all tasks in the container have been shut down.
> 5. Samza will ultimately shut down the job if all containers in the job have been shut down.
> 
> This is a step towards realizing a 'finite' Samza job that terminates (as opposed to an infinite stream job that keeps running) once data processing is complete.
> 
> 
> === This RB is an RFC for design feedback ====
> 
> TODO:
> 1. Add more unit tests
> 2. Verify behavior with multiple containers
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java cc860cf7eb4d514736913c1dceaa80534b61d71a 
>   samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala d32a92976e43ca24033b48c91851ee706de7de6b 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala e280daa9626757cb4d17c0c03eed923277230c3e 
> 
> Diff: https://reviews.apache.org/r/50143/diff/
> 
> 
> Testing
> -------
> 
> Added an unit test and verified that an End of stream message terminates the runloop.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>