You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Martin Kleppmann <mk...@linkedin.com> on 2014/04/05 00:46:42 UTC

Re: Review Request 19384: SAMZA-179 Allow a task to detect when it has caught up, and shut down gracefully.


> On March 25, 2014, 4:34 a.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala, line 86
> > <https://reviews.apache.org/r/19384/diff/3/?file=532921#file532921line86>
> >
> >     I think I prefer having a StreamMetadataCache object get passed in via the constructor, rather than using statics. It makes things more mockable. Can we make StreamMetadataCache work like a normal object, and just have SamzaContainer create one, and pass it into all the TaskInstances?

Ok, done on SAMZA-223.


> On March 25, 2014, 4:34 a.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala, line 33
> > <https://reviews.apache.org/r/19384/diff/3/?file=532922#file532922line33>
> >
> >     I think it might be better to make this just a normal class, and create an instance in SamzaContainer that gets passed everywhere. It will make everything more mockable, plus the TTL, clock, and SystemAdmins can be taken as parameters.

Ok, done on SAMZA-223.


> On March 25, 2014, 4:34 a.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala, line 75
> > <https://reviews.apache.org/r/19384/diff/3/?file=532922#file532922line75>
> >
> >     Do we need to lock here? I get needing to lock on write so we don't accidentally lose a CacheEntry, but on read, it seems like locking might not be required since we're using an immutable map.

Agree, removed this synchronization on SAMZA-223.


> On March 25, 2014, 4:34 a.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 98
> > <https://reviews.apache.org/r/19384/diff/3/?file=532925#file532925line98>
> >
> >     Can you do one dot per line here?
> >     
> >     metadata
> >       .getSystemStreamPartitionMetadata
> >       .keys
> >       .map

Ok, done on SAMZA-223.


> On March 25, 2014, 4:34 a.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 99
> > <https://reviews.apache.org/r/19384/diff/3/?file=532925#file532925line99>
> >
> >     I think you can just do new SystemStreamPartition(systemStream, _)

Good point, thanks. SAMZA-223


- Martin


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/19384/#review38411
-----------------------------------------------------------


On March 22, 2014, 8:18 p.m., Martin Kleppmann wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/19384/
> -----------------------------------------------------------
> 
> (Updated March 22, 2014, 8:18 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-179 Add caching of system metadata lookups
> 
> 
> SAMZA-179 Rename ReadableCoordinator to TaskCoordinators; improve shutdown method signature
> 
> 
> SAMZA-179 Allow a task to detect when it has caught up, and shut down gracefully.
> 
> 
> Diffs
> -----
> 
>   build.gradle fc596267e38c53cfc44f1cf52f8d6acedc848da8 
>   gradle/dependency-versions.gradle 612670dded2c2d290f1bc36ece1f3d53ffa4e971 
>   samza-api/src/main/java/org/apache/samza/task/TaskContext.java 611507ed340d9a3fb7b8cd5e0e0ce37b5561da32 
>   samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java 192b226c9e6105cf666d484ac33bb0f854de7688 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala c101b59f3e476dcc2e3b7870d53d0d36002f2434 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala c4b135c0f46edaa7fc0e4c0bf909e1ffa9515242 
>   samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala aaf631ec7acd710ab8a5b288f696233223569b60 
>   samza-core/src/main/scala/org/apache/samza/task/TaskCoordinators.scala PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 5b429dfece38d877175fa0495db6450e01d82689 
>   samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala a2d5820e9eeb7590d208cf7fb7025c589f451bca 
>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 27b4ca5d7995536aa66f63b7329caddf41865bb4 
>   samza-core/src/test/scala/org/apache/samza/system/TestStreamMetadataCache.scala PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala c45ed9bb1b3916cd4c4043e36272b4e3508bfb87 
>   samza-core/src/test/scala/org/apache/samza/task/TestTaskCoordinators.scala PRE-CREATION 
>   samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala 3dc263011b955cfccac83e71384b865f8fc2b722 
> 
> Diff: https://reviews.apache.org/r/19384/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Martin Kleppmann
> 
>