You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Navina Ramesh <nr...@linkedin.com> on 2015/08/26 23:49:56 UTC

Review Request 37817: SAMZA-619 - Modify SamzaAppMaster to enable host-affinity

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37817/
-----------------------------------------------------------

Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and Yi Pan (Data Infrastructure).


Repository: samza


Description
-------

This a major change to how we request and assign containers to resources. It uses a threaded model for request and allocation. More comments in the javadoc. 

Major changes to note:
1. Moved YarnConfig to java config class
2. Removed SamzaAppMasterTaskManager
3. SamzaAppState replaces SamzaAppMasterState
4. SamzaTaskManager is very similar to SamzaAppMasterTaskManager, except that it delegates onContainerAllocated and requestContainers to the thread in ContainerAllocator 
5. Removed state.unclaimedContainers
6. Allocator thread sleep time and request timeout is configurable
7. host-affinity can be enabled by using the config "yarn.samza.host-affinity.enabled". Host affinity is guaranteed to work with FairScheduler that has continuous scheduling enabled. Details on this config can be found at SAMZA-617 [design](https://issues.apache.org/jira/secure/attachment/12726945/DESIGN-SAMZA-617-2.pdf).

Pending items:
1. Adding unit tests for AppMaster, TaskManager, ContainerAllocator
2. Update config documentation 
3. Update web-site with info on this feature


Diffs
-----

  build.gradle 3a7fabcd4f4e5c8db950c3a33bba33618c5565b4 
  gradle/dependency-versions.gradle 36d564b6ca895f042ee4802643e49180f4947b62 
  samza-core/src/main/java/org/apache/samza/container/LocalityManager.java c567bf4a1747a7a6aff23be74cf25b1a44e57b42 
  samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java 5455881e2a4a9c865137f2708f655641b93c2bfb 
  samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 7b592740c082225fc217a24188bc874f883db63b 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 61e228b1ff8619da4ad2e11b15a5afb7999c4996 
  samza-core/src/main/scala/org/apache/samza/util/Util.scala c8438bd6ed01a95860867505d84c04c493a5a197 
  samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java PRE-CREATION 
  samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml ce2145adde09b0e8097a1c711bc296ee20392e63 
  samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 1f51551893c42bb13d7941c8b0e5594ac7f42585 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/ContainerUtil.scala PRE-CREATION 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala af42c6a6636953a95f79837fe372e0dbd735df70 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala d475c4c566f17f88a04db5fbc84cc2e27eb333d2 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala 03acfe1bbbabf8f54be9f36fdae785476da45135 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala 060538623e4d67b986bc635518e7fe8ebdde9e24 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala f667c83a7fff43a5efdc64cde019b2cf35f38cb9 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala 1743c8611a94fa1c7f7dafd8ff8c713039f3df8c 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 8dd70c9977473e083e463d01b049c40e15b21f4a 
  samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala 09f4dc32a4b18aeb3accb856c360ea2f95c82673 
  samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala 7fd51221149ba59e7ea1bc0b51d58726ec17a7d7 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala df5992e659302d2918c4e2c30b6122ed51ab9fe8 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala 6f4bfaf916d687426f746fd3b09cd56490bb500e 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 2eec65f02826de40493925c08ff344a8cc4feecb 

Diff: https://reviews.apache.org/r/37817/diff/


Testing
-------

Tested with hello-samza locally. Tested with a sample job on a 3 node Yarn cluster. Works as expected.


Thanks,

Navina Ramesh


Re: Review Request 37817: SAMZA-619 - Modify SamzaAppMaster to enable host-affinity

Posted by Navina Ramesh <nr...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37817/#review97517
-----------------------------------------------------------



samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java (line 236)
<https://reviews.apache.org/r/37817/#comment153446>

    Please ignore this test for the time-being. I am working on this. Thanks!


- Navina Ramesh


On Sept. 2, 2015, 7:17 p.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37817/
> -----------------------------------------------------------
> 
> (Updated Sept. 2, 2015, 7:17 p.m.)
> 
> 
> Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-619
>     https://issues.apache.org/jira/browse/SAMZA-619
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> This a major change to how we request and assign containers to resources. It uses a threaded model for request and allocation. More comments in the javadoc. 
> 
> Major changes to note:
> 1. Changed yarn dependency to 2.6
> 2. Moved YarnConfig to java config class
> 2. Removed SamzaAppMasterTaskManager
> 3. SamzaAppState replaces SamzaAppMasterState
> 4. SamzaTaskManager is very similar to SamzaAppMasterTaskManager, except that it delegates onContainerAllocated and requestContainers to the thread in ContainerAllocator 
> 5. Removed state.unclaimedContainers
> 6. Allocator thread sleep time and request timeout is configurable
> 7. host-affinity can be enabled by using the config "yarn.samza.host-affinity.enabled". Host affinity is guaranteed to work with FairScheduler that has continuous scheduling enabled. Details on this config can be found at SAMZA-617 [design](https://issues.apache.org/jira/secure/attachment/12726945/DESIGN-SAMZA-617-2.pdf).
> 
> Pending items:
> 1. Adding unit tests for AppMaster, TaskManager, ContainerAllocator
> 2. Update config documentation 
> 3. Update web-site with info on this feature
> 
> 
> Diffs
> -----
> 
>   build.gradle 3a7fabcd4f4e5c8db950c3a33bba33618c5565b4 
>   docs/learn/documentation/versioned/jobs/configuration-table.html c23d7d37f151a0dbdd71f52588773bc67edf88c8 
>   gradle/dependency-versions.gradle 36d564b6ca895f042ee4802643e49180f4947b62 
>   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java c567bf4a1747a7a6aff23be74cf25b1a44e57b42 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java 5455881e2a4a9c865137f2708f655641b93c2bfb 
>   samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 7b592740c082225fc217a24188bc874f883db63b 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 61e228b1ff8619da4ad2e11b15a5afb7999c4996 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala c8438bd6ed01a95860867505d84c04c493a5a197 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java PRE-CREATION 
>   samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml ce2145adde09b0e8097a1c711bc296ee20392e63 
>   samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 1f51551893c42bb13d7941c8b0e5594ac7f42585 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala af42c6a6636953a95f79837fe372e0dbd735df70 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala d475c4c566f17f88a04db5fbc84cc2e27eb333d2 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala 03acfe1bbbabf8f54be9f36fdae785476da45135 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala 060538623e4d67b986bc635518e7fe8ebdde9e24 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala f667c83a7fff43a5efdc64cde019b2cf35f38cb9 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala 1743c8611a94fa1c7f7dafd8ff8c713039f3df8c 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 8dd70c9977473e083e463d01b049c40e15b21f4a 
>   samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala 09f4dc32a4b18aeb3accb856c360ea2f95c82673 
>   samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala 7fd51221149ba59e7ea1bc0b51d58726ec17a7d7 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockHttpServer.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockNMClient.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestAMRMClientImpl.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java PRE-CREATION 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala df5992e659302d2918c4e2c30b6122ed51ab9fe8 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala 6f4bfaf916d687426f746fd3b09cd56490bb500e 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 2eec65f02826de40493925c08ff344a8cc4feecb 
> 
> Diff: https://reviews.apache.org/r/37817/diff/
> 
> 
> Testing
> -------
> 
> Tested with hello-samza locally. Tested with a sample job on a 3 node Yarn cluster. Works as expected.
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 37817: SAMZA-619 - Modify SamzaAppMaster to enable host-affinity

Posted by Yan Fang <ya...@gmail.com>.

> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, line 80
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065677#file1065677line80>
> >
> >     this getId is for the global container Id, right?
> 
> Navina Ramesh wrote:
>     What do you mean global container Id? This will be the containerId assigned by Yarn

yes, when I say "global container Id", I mean the id assigned by Yarn. You fixed it by changing another one to samzaContainerId :)


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java, lines 96-99
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065679#file1065679line96>
> >
> >     1. format the comment a little. :) using 
> >     /*
> >      *
> >      */
> >     
> >     2. this comment does not explain the code following it. Maybe this comment should be part of the allocatedContainers variable javadoc.
> 
> Navina Ramesh wrote:
>     Fixed the formatting. 
>     The comment was intented to explain why we update the allocatedContainers when only a containerRequest is made. So, I think it is better to leave it here. I have re-worded it to explain better.

sure. Leaving it there sounds good.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java, line 113
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065679#file1065679line113>
> >
> >     just curious if it is possible that, the hostname of the machine is different from the before-":"-part of the HttpAddress?
> 
> Navina Ramesh wrote:
>     what do you mean by different? the format of HttpAddress is "$hostname:$port". We are only interested in the hostname here.

ok. I see.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java, line 116
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065679#file1065679line116>
> >
> >     can line 116 to 163 be refactored a little? Now it looks very confusing to me. (Too many nested if)
> >     
> >     Is something like:
> >     
> >     if (requestCountOnThisHost > 0 &&  (allocatedContainersOnThisHost == null || allocatedContainersOnThisHost.size() < requestCountOnThisHost))  
> >     
> >     addToAllocatedContainerList(hostName, container);
> >     
> >     } 
> >     else {
> >       addToAllocatedContainerList(ANY_HOST, container);
> >     }
> >     
> >     sufficient for the logic?
> 
> Navina Ramesh wrote:
>     Yeah. It should be sufficient. This is how I had it before. It became hard to debug because you didn't why a container was allocated to a buffer. That's why I simplified the logic. I think for now it adds value to have detailed logging until we can make sure that the feature is stable. Do you think it is ok?

yes, it sounds ok to me.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java, line 280
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065679#file1065679line280>
> >
> >     this seems not safe to me.
> 
> Navina Ramesh wrote:
>     why? how do you suggest I can change it?

how about Collections.unmodifiableMap() and Collections.unmodifiableList() ?


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java, lines 79-80
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065680#file1065680line79>
> >
> >     it is worth differenciating the containerId and container.getId
> 
> Navina Ramesh wrote:
>     Ok. I will rename containerId to samzaContainerId.. Does that sound ok?

yes, definitely.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java, lines 157-161
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065684#file1065684line157>
> >
> >     can this be simplified as 
> >     if (state.runningContainers.containsValue(containerStatus.getContainerId()), then ...
> 
> Navina Ramesh wrote:
>     Yeah. but we are interested in the key for that entry. If you use an if-condition to check whether the value is present, how will you return the key? You won't have any handle/reference to the map entry. Also, we are not directly comparing the value. We are checking the value of a field in the entry's value. Hence, if statement is insufficient.

you are right.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala, line 74
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065693#file1065693line74>
> >
> >     it should be amJavaHome.isEmpty() , because we set "" as the default value in YarnConfig.
> 
> Navina Ramesh wrote:
>     We are setting default as "null" in YarnConfig in Line 149, aren't we ?

oh, yes, you are right. I thought it was "AmOpt", which is "". :)


- Yan


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37817/#review98316
-----------------------------------------------------------


On Sept. 20, 2015, 4:45 a.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37817/
> -----------------------------------------------------------
> 
> (Updated Sept. 20, 2015, 4:45 a.m.)
> 
> 
> Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-619
>     https://issues.apache.org/jira/browse/SAMZA-619
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> This a major change to how we request and assign containers to resources. It uses a threaded model for request and allocation. More comments in the javadoc. 
> 
> Major changes to note:
> 1. Changed yarn dependency to 2.6
> 2. Moved YarnConfig to java config class
> 2. Removed SamzaAppMasterTaskManager
> 3. SamzaAppState replaces SamzaAppMasterState
> 4. SamzaTaskManager is very similar to SamzaAppMasterTaskManager, except that it delegates onContainerAllocated and requestContainers to the thread in ContainerAllocator 
> 5. Removed state.unclaimedContainers
> 6. Allocator thread sleep time and request timeout is configurable
> 7. host-affinity can be enabled by using the config "yarn.samza.host-affinity.enabled". Host affinity is guaranteed to work with FairScheduler that has continuous scheduling enabled. Details on this config can be found at SAMZA-617 [design](https://issues.apache.org/jira/secure/attachment/12726945/DESIGN-SAMZA-617-2.pdf).
> 8. Added unit tests for TaskManager & ContainerAllocator 
> 9. Updated config documentation
> 10. Removed TestSamzaAppMasterTaskManager.scala
> 
> Pending items:
> 1. Update web-site with info on this feature (SAMZA-668)
> 
> 
> Diffs
> -----
> 
>   build.gradle 3a7fabcd4f4e5c8db950c3a33bba33618c5565b4 
>   docs/learn/documentation/versioned/jobs/configuration-table.html c23d7d37f151a0dbdd71f52588773bc67edf88c8 
>   gradle/dependency-versions.gradle 36d564b6ca895f042ee4802643e49180f4947b62 
>   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java c567bf4a1747a7a6aff23be74cf25b1a44e57b42 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java 5455881e2a4a9c865137f2708f655641b93c2bfb 
>   samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 7b592740c082225fc217a24188bc874f883db63b 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 61e228b1ff8619da4ad2e11b15a5afb7999c4996 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala c8438bd6ed01a95860867505d84c04c493a5a197 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java PRE-CREATION 
>   samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml ce2145adde09b0e8097a1c711bc296ee20392e63 
>   samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 1f51551893c42bb13d7941c8b0e5594ac7f42585 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala af42c6a6636953a95f79837fe372e0dbd735df70 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala d475c4c566f17f88a04db5fbc84cc2e27eb333d2 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala 03acfe1bbbabf8f54be9f36fdae785476da45135 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala 060538623e4d67b986bc635518e7fe8ebdde9e24 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala f667c83a7fff43a5efdc64cde019b2cf35f38cb9 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala 1743c8611a94fa1c7f7dafd8ff8c713039f3df8c 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 8dd70c9977473e083e463d01b049c40e15b21f4a 
>   samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala 09f4dc32a4b18aeb3accb856c360ea2f95c82673 
>   samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala 7fd51221149ba59e7ea1bc0b51d58726ec17a7d7 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockHttpServer.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockNMClient.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestAMRMClientImpl.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java PRE-CREATION 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala 7b7d86a43c69e72c47eaa91f68be24e0f4022891 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala df5992e659302d2918c4e2c30b6122ed51ab9fe8 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala 6f4bfaf916d687426f746fd3b09cd56490bb500e 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 2eec65f02826de40493925c08ff344a8cc4feecb 
> 
> Diff: https://reviews.apache.org/r/37817/diff/
> 
> 
> Testing
> -------
> 
> Tested with hello-samza locally. Tested with a sample job on a 3 node Yarn cluster. Works as expected.
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 37817: SAMZA-619 - Modify SamzaAppMaster to enable host-affinity

Posted by Navina Ramesh <nr...@linkedin.com>.

> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/job/model/JobModel.java, lines 65-72
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065672#file1065672line65>
> >
> >     duplicated code with line 116 - 122

Ok. Fixed it.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 44
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065674#file1065674line44>
> >
> >     not needed, 1) it is very short alreay 2) sometimes you are using clock, sometimes, calling System.currentTimeMillis, why not just calling System.currentTimeMillis anywhere?

Sure. Makes sense. We can call System.currentTimeMillis everywhere. Although I think this statement in our coding guideline - "* Do not use sleep or other timing assumptions in tests, it is always, always, always wrong and will fail intermittently on any test server with other things going on that causes delays. Write tests in such a way that they are not timing dependent. Seriously. One thing that will help this is to never directly use the system clock in code (i.e. System.currentTimeMillis) but instead to use getTime: () => Long, so that time can be mocked.*" is pretty useful to have. 
I don't have a strong opinion because I don't think we are using it any of our unit tests (yet) and we might have to get rid of the scala way of doing things until we move to jdk 1.8.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java, line 35
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065676#file1065676line35>
> >
> >     if we are forced to implement the Runnable anyway, why not implement the interface in this abstract class?

Ah yeah. I should have just added it to the abstract class!


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java, lines 67-68
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065676#file1065676line67>
> >
> >     To be more precise, when run host-affinity-enabled job first time, the locality value is null too.

Ok. Changed it.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java, line 89
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065676#file1065676line89>
> >
> >     -> */

Changed it.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, line 37
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065677#file1065677line37>
> >
> >     just follow the same code style: start the variable with a letter

ok


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, line 69
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065677#file1065677line69>
> >
> >     maybe better to put the sleep after  containerRequestState.releaseExtraContainers(), because there is no reason that we sleep at the very beginning.

In the beginning, it is more likely that no container requests are submitted to the RM. It really doesn't matter where the sleep is going to be. I will move it to the end if it is easier to understand the logic.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, line 71
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065677#file1065677line71>
> >
> >     getContainersOnAHost returns allocatedContainers, not availableContainers. So what is the difference beween allocatedContainer and available conainers?

It should be allocatedContainers. There is no difference. This is a result of my refactoring. Thanks for raising this point.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, line 80
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065677#file1065677line80>
> >
> >     this getId is for the global container Id, right?

What do you mean global container Id? This will be the containerId assigned by Yarn


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, line 84
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065677#file1065677line84>
> >
> >     do not understand this comment. Why are there any extra containers? Do we request more containers than needed? Or this is only for host-enabled?

Yeah. This is precisely the reason why this patch took so long :(
Basically, container allocation in yarn is fully asynchronous, meaning it invokes "allocate" to the RM with every heartbeat. Each allocate request in the heartbeat has an up-to-date state of the container requests we have made. Since an allocated container is handled asynchronously in the AM Callback thread, it is possible that while allocated container is getting started, it will heartbeat to the RM again. RM thinks that the container request is still valid and assigns a new one. In order to avoid excessive container allocation for a single request, looks like the client (AM in this case) has to do the bookkeeping of the request state and remove any container request that has been satisfied. Unless the client explicitly remove the request from the local queue (I am referring to the queue maintained by the AMRMClient), the RM continues to assign containers. This is the case even today, because we do not remove requests from the local queue.
Folks in Slider and Spark have also come across this behavior and changed their AM logic. It's time for Samza to catch-up :)

Here are some relevant JIRAs for you to look at :
https://issues.apache.org/jira/browse/SLIDER-828
https://issues.apache.org/jira/browse/SPARK-2687
https://issues.apache.org/jira/browse/YARN-1902

Let me know if you have more questions.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java, line 41
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065679#file1065679line41>
> >
> >     remove _

removed


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, lines 88-89
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065677#file1065677line88>
> >
> >     change to log.info(msg, e)?

Ok


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java, line 42
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065679#file1065679line42>
> >
> >     I think ANY_HOST = "ANY_HOST" is less confusing.

Ok


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java, lines 44-47
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065679#file1065679line44>
> >
> >     what is this comment for?

It is redundant. Let me remove it.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java, line 55
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065679#file1065679line55>
> >
> >     naming to requestQueue maybe more descriptive. Leave it to you.

Changed it. I was afraid to change it previously because it was used in a lot of places. Now there are sufficient tests. So, I can change it without fear of breaking anything :)


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java, line 61
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065679#file1065679line61>
> >
> >     Maybe requests2StateMap/requests2CountMap ? just 2 cents.

Definitely sounds better. I renamed it to requestsToCountMap.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java, lines 96-99
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065679#file1065679line96>
> >
> >     1. format the comment a little. :) using 
> >     /*
> >      *
> >      */
> >     
> >     2. this comment does not explain the code following it. Maybe this comment should be part of the allocatedContainers variable javadoc.

Fixed the formatting. 
The comment was intented to explain why we update the allocatedContainers when only a containerRequest is made. So, I think it is better to leave it here. I have re-worded it to explain better.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java, line 113
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065679#file1065679line113>
> >
> >     just curious if it is possible that, the hostname of the machine is different from the before-":"-part of the HttpAddress?

what do you mean by different? the format of HttpAddress is "$hostname:$port". We are only interested in the hostname here.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java, line 116
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065679#file1065679line116>
> >
> >     can line 116 to 163 be refactored a little? Now it looks very confusing to me. (Too many nested if)
> >     
> >     Is something like:
> >     
> >     if (requestCountOnThisHost > 0 &&  (allocatedContainersOnThisHost == null || allocatedContainersOnThisHost.size() < requestCountOnThisHost))  
> >     
> >     addToAllocatedContainerList(hostName, container);
> >     
> >     } 
> >     else {
> >       addToAllocatedContainerList(ANY_HOST, container);
> >     }
> >     
> >     sufficient for the logic?

Yeah. It should be sufficient. This is how I had it before. It became hard to debug because you didn't why a container was allocated to a buffer. That's why I simplified the logic. I think for now it adds value to have detailed logging until we can make sure that the feature is stable. Do you think it is ok?


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java, lines 135-137
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065679#file1065679line135>
> >
> >     nit: follow the block comments format when the comments can not be fit in one line. :)
> >     
> >     /*
> >      *
> >      */

Ok


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java, line 199
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065679#file1065679line199>
> >
> >     you mentioned "buffer" in a few places, which variable do you refer to?

buffer is how I would like semantically refer to it. As for variable name, it is still of a list type :) 
"buffer" refers to the list of allocated containers maintained in the allocatedContainers map.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java, lines 231-232
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065679#file1065679line231>
> >
> >     why is this logic not in the hostAffinityEnabled clause?

Hm.. This logic should apply for hostAffinityEnabled clause as well. I think this was a result of the refactoring. And looks like all it does is logging. I will actually remove this as it doesn't serve much purpose.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java, lines 267-273
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065679#file1065679line267>
> >
> >     why not just use allocatedContainers.get(host);

:) yes. You are right. I think this stems from before the time I added the fix in Line 96-99 for correlating data and null checks in the run loop.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java, line 280
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065679#file1065679line280>
> >
> >     this seems not safe to me.

why? how do you suggest I can change it?


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java, line 51
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065680#file1065680line51>
> >
> >     no _

Done!


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java, lines 79-80
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065680#file1065680line79>
> >
> >     it is worth differenciating the containerId and container.getId

Ok. I will rename containerId to samzaContainerId.. Does that sound ok?


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java, line 178
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065680#file1065680line178>
> >
> >     wrap it in the log or the exception? Because if users are using the StreamAppender, they may lose those information.

Got it.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java, line 196
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065680#file1065680line196>
> >
> >     same: wrap it

Done


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java, line 199
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065680#file1065680line199>
> >
> >     same: wrap it

Done


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java, line 41
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065681#file1065681line41>
> >
> >     no _

Done


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java, lines 60-68
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065681#file1065681line60>
> >
> >     actually both HostAwareContainerAllocator and ContainerAllocator share the same logic: if value is null, use ANY_HOST. We may consider to abstract them to AbstractContainerAllocator. my 0.02.

I think I separated them to make sure that if a user switches to host-affinity and then, decides to disable it, host-affinity mapping will still be non-null. I don't think it affects the behavior or performance. Let me see if I can merge it into the abstract class.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java, line 72
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065681#file1065681line72>
> >
> >     remove empty line

done


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java, line 77
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065681#file1065681line77>
> >
> >     you mean, ANY or ANY_HOST?

changed it


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java, line 84
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065681#file1065681line84>
> >
> >     the same, move to the end of the try clause?

Done


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java, line 86
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065681#file1065681line86>
> >
> >     is it possible to refactor this part a little to make it more readable?  Currently too many nested if statement. Many conditions can be combined.

As before, nested ifs enable us to log better. In this case, the action to be take on each conditional clause is not overlapping either. Let me give refactoring a try. I think I have sufficient unit tests to verify all changes.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java, line 96
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065681#file1065681line96>
> >
> >     same confusion. getContainersOnAHost gives the allocated containers, not avaialble containers, right?
> >     
> >     I guess getContainersOnAHost should talk to the RM to negotiate the possible resource?

Sorry about the confusion. getContainersOnAHost is just an accessor to the state we maintain in ContainerRequestState. We try to buffer all containers allocated by the RM in a map keyed by the hostname.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java, line 142
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065681#file1065681line142>
> >
> >     do we define how many times we want to try before fail the whole job? Because if there is not sufficient resource, it is meaningless to keep trying the request.

We don't keep track of how many times to we request. If we request for host, then we wait for CONTAINER_REQUEST_TIMEOUT ms to satisfy the request. If the requested host is not allocated within that timeout, we choose from ANY_HOST. If there is insufficient resources in the cluster, then the behavior is same as today. Job does not fully start. I think we should handle this issue. However it warrants a separate JIRA.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java, line 158
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065681#file1065681line158>
> >
> >     warp it

done


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java, line 161
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065681#file1065681line161>
> >
> >     wrap it

done


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java, line 42
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065682#file1065682line42>
> >
> >     -> */

done


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java, line 50
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065682#file1065682line50>
> >
> >     -> */

done


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java, line 56
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065682#file1065682line56>
> >
> >     -> */

done


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java, line 57
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065682#file1065682line57>
> >
> >     if this is for the AM container id, maybe amContainerId makes more sense?

Yep


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java, line 115
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065682#file1065682line115>
> >
> >     to */

done


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java, line 120
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065682#file1065682line120>
> >
> >     to */

done


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java, line 130
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065682#file1065682line130>
> >
> >     to */

done


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java, line 140
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065682#file1065682line140>
> >
> >     since now we have two different containerId, one is globally unique, ContainerStatus.getContainerId, and the other one is Samza's containerId (based on the container count). Maybe worth differenciating the name. It confused me a little.

You are right. The existing nomenclature is the same and confusing. I have tried to change the variable names to make it more obvious. I hope it is better after refactoring.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java, lines 39-42
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065683#file1065683line39>
> >
> >     format the comment to 
> >     /*
> >      *
> >      */

done


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java, lines 55-64
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065683#file1065683line55>
> >
> >     combine them into one if statement?

yes


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java, lines 69-71
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065683#file1065683line69>
> >
> >     I think the defaul Core, Memo, Priority should not be here because they already are in the YarnConfig. So we may want to pass those values from AbstractContainerAllocator, instead of having them here.

This was just a handy construtor. Are you suggesting that I move the default definition to AbstractContainerAllocator and refer it in SamzaContainerRequest ? Let me know if I understood your comment correctly.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java, line 51
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065684#file1065684line51>
> >
> >     where is the callback handler? Is it the SamzaTaskManager ?

SamzaAppMaster is the callback handler because it extends the AMRMClientAsync.CallbackHandler interface. Even though the AppMaster clas is registered as callback handler implementor in Line 84 of SamzaAppMaster.scala, for the sake of clarity, the callbacks are delegated to methods in the SamzaTaskManager.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java, line 58
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065684#file1065684line58>
> >
> >     -> */

done


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java, line 61
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065684#file1065684line61>
> >
> >     remove _

done


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java, line 85
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065684#file1065684line85>
> >
> >     we usually have the config in a separate class. Maybe put it into the YarnConfig?

Makes sense.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java, line 118
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065684#file1065684line118>
> >
> >     TODO

ok


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java, line 139
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065684#file1065684line139>
> >
> >     wrap it.

ok


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java, line 152
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065684#file1065684line152>
> >
> >     -> */

ok


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java, line 154
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065684#file1065684line154>
> >
> >     javadoc will help understand this method

ok


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java, lines 157-161
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065684#file1065684line157>
> >
> >     can this be simplified as 
> >     if (state.runningContainers.containsValue(containerStatus.getContainerId()), then ...

Yeah. but we are interested in the key for that entry. If you use an if-condition to check whether the value is present, how will you return the key? You won't have any handle/reference to the map entry. Also, we are not directly comparing the value. We are checking the value of a field in the entry's value. Hence, if statement is insufficient.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java, line 174
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065684#file1065684line174>
> >
> >     why is the containerId in the containerFailures list by default? It is possible that this containerId not in the list, right?

I think the variable name is misleading. containerFailures is a map of samzaContainerId -> ContainerFailure instance. ContainerFailure instance encapsulates the last failed time and number of time a samzaContainerId failed.
It is possible that the containerId is not in the list. In that case, remove acts like a no-op.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java, lines 183-185
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065684#file1065684line183>
> >
> >     from this logic, even the Container fails because of DISKS_FAILED or ABORTED, we will still have the following log.info and do the line 186-205. Is it supposed to be this?

Yes. The behavior is supposed to be the same. I guess the log.info can use a little refactoring. It doesn't accommodate the yarn 2.6 ContainerExitStatus that we have added here.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java, line 186
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065684#file1065684line186>
> >
> >     use the PREEMPTED instead of -100?

Made the log statement generic. Not sure how to get the name of the ExitStatus printed without maintaining a mapping. The ContainerExitStatus is not an enum, but a class with static variables.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java, line 244
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065684#file1065684line244>
> >
> >     why not just give the currentFailCount and lastFailureTime default values? Then we can get rid of the "else" part. looks simpler.

Yeah. But that means we have to initialize the containerFailures map with each containerId and a default ContainerFailure instance. I think having an else statement is simpler.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java, line 245
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065684#file1065684line245>
> >
> >     should be 1. because it is used in line 270

right.


> On Sept. 11, 2015, 1:47 a.m., Yan Fang wrote:
> > samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala, line 74
> > <https://reviews.apache.org/r/37817/diff/5/?file=1065693#file1065693line74>
> >
> >     it should be amJavaHome.isEmpty() , because we set "" as the default value in YarnConfig.

We are setting default as "null" in YarnConfig in Line 149, aren't we ?


- Navina


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37817/#review98316
-----------------------------------------------------------


On Sept. 9, 2015, 12:28 a.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37817/
> -----------------------------------------------------------
> 
> (Updated Sept. 9, 2015, 12:28 a.m.)
> 
> 
> Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-619
>     https://issues.apache.org/jira/browse/SAMZA-619
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> This a major change to how we request and assign containers to resources. It uses a threaded model for request and allocation. More comments in the javadoc. 
> 
> Major changes to note:
> 1. Changed yarn dependency to 2.6
> 2. Moved YarnConfig to java config class
> 2. Removed SamzaAppMasterTaskManager
> 3. SamzaAppState replaces SamzaAppMasterState
> 4. SamzaTaskManager is very similar to SamzaAppMasterTaskManager, except that it delegates onContainerAllocated and requestContainers to the thread in ContainerAllocator 
> 5. Removed state.unclaimedContainers
> 6. Allocator thread sleep time and request timeout is configurable
> 7. host-affinity can be enabled by using the config "yarn.samza.host-affinity.enabled". Host affinity is guaranteed to work with FairScheduler that has continuous scheduling enabled. Details on this config can be found at SAMZA-617 [design](https://issues.apache.org/jira/secure/attachment/12726945/DESIGN-SAMZA-617-2.pdf).
> 8. Added unit tests for TaskManager & ContainerAllocator 
> 9. Updated config documentation
> 
> Pending items:
> 1. Update web-site with info on this feature (SAMZA-668)
> 
> 
> Diffs
> -----
> 
>   build.gradle 3a7fabcd4f4e5c8db950c3a33bba33618c5565b4 
>   docs/learn/documentation/versioned/jobs/configuration-table.html c23d7d37f151a0dbdd71f52588773bc67edf88c8 
>   gradle/dependency-versions.gradle 36d564b6ca895f042ee4802643e49180f4947b62 
>   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java c567bf4a1747a7a6aff23be74cf25b1a44e57b42 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java 5455881e2a4a9c865137f2708f655641b93c2bfb 
>   samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 7b592740c082225fc217a24188bc874f883db63b 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 61e228b1ff8619da4ad2e11b15a5afb7999c4996 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala c8438bd6ed01a95860867505d84c04c493a5a197 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java PRE-CREATION 
>   samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml ce2145adde09b0e8097a1c711bc296ee20392e63 
>   samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 1f51551893c42bb13d7941c8b0e5594ac7f42585 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala af42c6a6636953a95f79837fe372e0dbd735df70 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala d475c4c566f17f88a04db5fbc84cc2e27eb333d2 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala 03acfe1bbbabf8f54be9f36fdae785476da45135 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala 060538623e4d67b986bc635518e7fe8ebdde9e24 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala f667c83a7fff43a5efdc64cde019b2cf35f38cb9 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala 1743c8611a94fa1c7f7dafd8ff8c713039f3df8c 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 8dd70c9977473e083e463d01b049c40e15b21f4a 
>   samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala 09f4dc32a4b18aeb3accb856c360ea2f95c82673 
>   samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala 7fd51221149ba59e7ea1bc0b51d58726ec17a7d7 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockHttpServer.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockNMClient.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestAMRMClientImpl.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java PRE-CREATION 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala df5992e659302d2918c4e2c30b6122ed51ab9fe8 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala 6f4bfaf916d687426f746fd3b09cd56490bb500e 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 2eec65f02826de40493925c08ff344a8cc4feecb 
> 
> Diff: https://reviews.apache.org/r/37817/diff/
> 
> 
> Testing
> -------
> 
> Tested with hello-samza locally. Tested with a sample job on a 3 node Yarn cluster. Works as expected.
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 37817: SAMZA-619 - Modify SamzaAppMaster to enable host-affinity

Posted by Yan Fang <ya...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37817/#review98316
-----------------------------------------------------------



samza-core/src/main/java/org/apache/samza/job/model/JobModel.java (lines 65 - 72)
<https://reviews.apache.org/r/37817/#comment155027>

    duplicated code with line 116 - 122



samza-core/src/main/scala/org/apache/samza/util/Util.scala (line 44)
<https://reviews.apache.org/r/37817/#comment154725>

    not needed, 1) it is very short alreay 2) sometimes you are using clock, sometimes, calling System.currentTimeMillis, why not just calling System.currentTimeMillis anywhere?



samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java (line 35)
<https://reviews.apache.org/r/37817/#comment154682>

    if we are forced to implement the Runnable anyway, why not implement the interface in this abstract class?



samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java (lines 67 - 68)
<https://reviews.apache.org/r/37817/#comment154681>

    To be more precise, when run host-affinity-enabled job first time, the locality value is null too.



samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java (line 89)
<https://reviews.apache.org/r/37817/#comment154997>

    -> */



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java (line 37)
<https://reviews.apache.org/r/37817/#comment154683>

    just follow the same code style: start the variable with a letter



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java (lines 48 - 49)
<https://reviews.apache.org/r/37817/#comment154685>

    



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java (line 69)
<https://reviews.apache.org/r/37817/#comment154688>

    maybe better to put the sleep after  containerRequestState.releaseExtraContainers(), because there is no reason that we sleep at the very beginning.



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java (line 71)
<https://reviews.apache.org/r/37817/#comment155024>

    getContainersOnAHost returns allocatedContainers, not availableContainers. So what is the difference beween allocatedContainer and available conainers?



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java (line 80)
<https://reviews.apache.org/r/37817/#comment155000>

    this getId is for the global container Id, right?



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java (line 84)
<https://reviews.apache.org/r/37817/#comment155025>

    do not understand this comment. Why are there any extra containers? Do we request more containers than needed? Or this is only for host-enabled?



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java (lines 88 - 89)
<https://reviews.apache.org/r/37817/#comment154687>

    change to log.info(msg, e)?



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (line 41)
<https://reviews.apache.org/r/37817/#comment154689>

    remove _



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (line 42)
<https://reviews.apache.org/r/37817/#comment154696>

    I think ANY_HOST = "ANY_HOST" is less confusing.



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (lines 44 - 47)
<https://reviews.apache.org/r/37817/#comment154691>

    what is this comment for?



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (line 55)
<https://reviews.apache.org/r/37817/#comment154692>

    naming to requestQueue maybe more descriptive. Leave it to you.



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (line 61)
<https://reviews.apache.org/r/37817/#comment154693>

    Maybe requests2StateMap/requests2CountMap ? just 2 cents.



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (lines 96 - 99)
<https://reviews.apache.org/r/37817/#comment154694>

    1. format the comment a little. :) using 
    /*
     *
     */
    
    2. this comment does not explain the code following it. Maybe this comment should be part of the allocatedContainers variable javadoc.



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (line 113)
<https://reviews.apache.org/r/37817/#comment154695>

    just curious if it is possible that, the hostname of the machine is different from the before-":"-part of the HttpAddress?



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (line 116)
<https://reviews.apache.org/r/37817/#comment154698>

    can line 116 to 163 be refactored a little? Now it looks very confusing to me. (Too many nested if)
    
    Is something like:
    
    if (requestCountOnThisHost > 0 &&  (allocatedContainersOnThisHost == null || allocatedContainersOnThisHost.size() < requestCountOnThisHost))  
    
    addToAllocatedContainerList(hostName, container);
    
    } 
    else {
      addToAllocatedContainerList(ANY_HOST, container);
    }
    
    sufficient for the logic?



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (line 124)
<https://reviews.apache.org/r/37817/#comment155013>

    this is a little verbose. (just my opinion. :)



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (lines 135 - 137)
<https://reviews.apache.org/r/37817/#comment154697>

    nit: follow the block comments format when the comments can not be fit in one line. :)
    
    /*
     *
     */



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (line 199)
<https://reviews.apache.org/r/37817/#comment155007>

    you mentioned "buffer" in a few places, which variable do you refer to?



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (lines 231 - 232)
<https://reviews.apache.org/r/37817/#comment155008>

    why is this logic not in the hostAffinityEnabled clause?



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (lines 267 - 273)
<https://reviews.apache.org/r/37817/#comment154699>

    why not just use allocatedContainers.get(host);



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (line 276)
<https://reviews.apache.org/r/37817/#comment155020>

    this is not safe. Maybe return an unmodifiable version?



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (line 280)
<https://reviews.apache.org/r/37817/#comment155022>

    this seems not safe to me.



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java (line 51)
<https://reviews.apache.org/r/37817/#comment154700>

    no _



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java (lines 79 - 80)
<https://reviews.apache.org/r/37817/#comment154706>

    it is worth differenciating the containerId and container.getId



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java (line 178)
<https://reviews.apache.org/r/37817/#comment154701>

    wrap it in the log or the exception? Because if users are using the StreamAppender, they may lose those information.



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java (line 196)
<https://reviews.apache.org/r/37817/#comment154702>

    same: wrap it



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java (line 199)
<https://reviews.apache.org/r/37817/#comment154703>

    same: wrap it



samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java (line 41)
<https://reviews.apache.org/r/37817/#comment154707>

    no _



samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java (lines 60 - 68)
<https://reviews.apache.org/r/37817/#comment155029>

    actually both HostAwareContainerAllocator and ContainerAllocator share the same logic: if value is null, use ANY_HOST. We may consider to abstract them to AbstractContainerAllocator. my 0.02.



samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java (line 72)
<https://reviews.apache.org/r/37817/#comment154709>

    remove empty line



samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java (line 77)
<https://reviews.apache.org/r/37817/#comment155030>

    you mean, ANY or ANY_HOST?



samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java (line 84)
<https://reviews.apache.org/r/37817/#comment154710>

    the same, move to the end of the try clause?



samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java (line 86)
<https://reviews.apache.org/r/37817/#comment154717>

    is it possible to refactor this part a little to make it more readable?  Currently too many nested if statement. Many conditions can be combined.



samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java (line 96)
<https://reviews.apache.org/r/37817/#comment155032>

    same confusion. getContainersOnAHost gives the allocated containers, not avaialble containers, right?
    
    I guess getContainersOnAHost should talk to the RM to negotiate the possible resource?



samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java (line 142)
<https://reviews.apache.org/r/37817/#comment154721>

    do we define how many times we want to try before fail the whole job? Because if there is not sufficient resource, it is meaningless to keep trying the request.



samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java (line 158)
<https://reviews.apache.org/r/37817/#comment154714>

    warp it



samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java (line 161)
<https://reviews.apache.org/r/37817/#comment154715>

    wrap it



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java (line 42)
<https://reviews.apache.org/r/37817/#comment154799>

    -> */



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java (line 50)
<https://reviews.apache.org/r/37817/#comment154800>

    -> */



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java (line 56)
<https://reviews.apache.org/r/37817/#comment154802>

    -> */



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java (line 57)
<https://reviews.apache.org/r/37817/#comment154803>

    if this is for the AM container id, maybe amContainerId makes more sense?



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java (line 115)
<https://reviews.apache.org/r/37817/#comment154718>

    to */



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java (line 120)
<https://reviews.apache.org/r/37817/#comment154719>

    to */



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java (line 130)
<https://reviews.apache.org/r/37817/#comment154720>

    to */



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java (line 140)
<https://reviews.apache.org/r/37817/#comment154974>

    since now we have two different containerId, one is globally unique, ContainerStatus.getContainerId, and the other one is Samza's containerId (based on the container count). Maybe worth differenciating the name. It confused me a little.



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java (lines 39 - 42)
<https://reviews.apache.org/r/37817/#comment154723>

    format the comment to 
    /*
     *
     */



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java (lines 55 - 64)
<https://reviews.apache.org/r/37817/#comment154724>

    combine them into one if statement?



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java (lines 69 - 71)
<https://reviews.apache.org/r/37817/#comment155034>

    I think the defaul Core, Memo, Priority should not be here because they already are in the YarnConfig. So we may want to pass those values from AbstractContainerAllocator, instead of having them here.



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 51)
<https://reviews.apache.org/r/37817/#comment155035>

    where is the callback handler? Is it the SamzaTaskManager ?



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 58)
<https://reviews.apache.org/r/37817/#comment154967>

    -> */



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 61)
<https://reviews.apache.org/r/37817/#comment154968>

    remove _



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 85)
<https://reviews.apache.org/r/37817/#comment154969>

    we usually have the config in a separate class. Maybe put it into the YarnConfig?



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 118)
<https://reviews.apache.org/r/37817/#comment154970>

    TODO



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 139)
<https://reviews.apache.org/r/37817/#comment154971>

    wrap it.



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 152)
<https://reviews.apache.org/r/37817/#comment154972>

    -> */



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 154)
<https://reviews.apache.org/r/37817/#comment154973>

    javadoc will help understand this method



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (lines 157 - 161)
<https://reviews.apache.org/r/37817/#comment154976>

    can this be simplified as 
    if (state.runningContainers.containsValue(containerStatus.getContainerId()), then ...



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 174)
<https://reviews.apache.org/r/37817/#comment154977>

    why is the containerId in the containerFailures list by default? It is possible that this containerId not in the list, right?



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (lines 183 - 185)
<https://reviews.apache.org/r/37817/#comment154980>

    from this logic, even the Container fails because of DISKS_FAILED or ABORTED, we will still have the following log.info and do the line 186-205. Is it supposed to be this?



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 186)
<https://reviews.apache.org/r/37817/#comment154981>

    use the PREEMPTED instead of -100?



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 244)
<https://reviews.apache.org/r/37817/#comment154989>

    why not just give the currentFailCount and lastFailureTime default values? Then we can get rid of the "else" part. looks simpler.



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 245)
<https://reviews.apache.org/r/37817/#comment154986>

    should be 1. because it is used in line 270



samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala (line 69)
<https://reviews.apache.org/r/37817/#comment154728>

    it should be amJavaHome.isEmpty() , because we set "" as the default value in YarnConfig.


- Yan Fang


On Sept. 9, 2015, 12:28 a.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37817/
> -----------------------------------------------------------
> 
> (Updated Sept. 9, 2015, 12:28 a.m.)
> 
> 
> Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-619
>     https://issues.apache.org/jira/browse/SAMZA-619
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> This a major change to how we request and assign containers to resources. It uses a threaded model for request and allocation. More comments in the javadoc. 
> 
> Major changes to note:
> 1. Changed yarn dependency to 2.6
> 2. Moved YarnConfig to java config class
> 2. Removed SamzaAppMasterTaskManager
> 3. SamzaAppState replaces SamzaAppMasterState
> 4. SamzaTaskManager is very similar to SamzaAppMasterTaskManager, except that it delegates onContainerAllocated and requestContainers to the thread in ContainerAllocator 
> 5. Removed state.unclaimedContainers
> 6. Allocator thread sleep time and request timeout is configurable
> 7. host-affinity can be enabled by using the config "yarn.samza.host-affinity.enabled". Host affinity is guaranteed to work with FairScheduler that has continuous scheduling enabled. Details on this config can be found at SAMZA-617 [design](https://issues.apache.org/jira/secure/attachment/12726945/DESIGN-SAMZA-617-2.pdf).
> 8. Added unit tests for TaskManager & ContainerAllocator 
> 9. Updated config documentation
> 
> Pending items:
> 1. Update web-site with info on this feature (SAMZA-668)
> 
> 
> Diffs
> -----
> 
>   build.gradle 3a7fabcd4f4e5c8db950c3a33bba33618c5565b4 
>   docs/learn/documentation/versioned/jobs/configuration-table.html c23d7d37f151a0dbdd71f52588773bc67edf88c8 
>   gradle/dependency-versions.gradle 36d564b6ca895f042ee4802643e49180f4947b62 
>   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java c567bf4a1747a7a6aff23be74cf25b1a44e57b42 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java 5455881e2a4a9c865137f2708f655641b93c2bfb 
>   samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 7b592740c082225fc217a24188bc874f883db63b 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 61e228b1ff8619da4ad2e11b15a5afb7999c4996 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala c8438bd6ed01a95860867505d84c04c493a5a197 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java PRE-CREATION 
>   samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml ce2145adde09b0e8097a1c711bc296ee20392e63 
>   samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 1f51551893c42bb13d7941c8b0e5594ac7f42585 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala af42c6a6636953a95f79837fe372e0dbd735df70 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala d475c4c566f17f88a04db5fbc84cc2e27eb333d2 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala 03acfe1bbbabf8f54be9f36fdae785476da45135 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala 060538623e4d67b986bc635518e7fe8ebdde9e24 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala f667c83a7fff43a5efdc64cde019b2cf35f38cb9 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala 1743c8611a94fa1c7f7dafd8ff8c713039f3df8c 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 8dd70c9977473e083e463d01b049c40e15b21f4a 
>   samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala 09f4dc32a4b18aeb3accb856c360ea2f95c82673 
>   samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala 7fd51221149ba59e7ea1bc0b51d58726ec17a7d7 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockHttpServer.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockNMClient.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestAMRMClientImpl.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java PRE-CREATION 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala df5992e659302d2918c4e2c30b6122ed51ab9fe8 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala 6f4bfaf916d687426f746fd3b09cd56490bb500e 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 2eec65f02826de40493925c08ff344a8cc4feecb 
> 
> Diff: https://reviews.apache.org/r/37817/diff/
> 
> 
> Testing
> -------
> 
> Tested with hello-samza locally. Tested with a sample job on a 3 node Yarn cluster. Works as expected.
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 37817: SAMZA-619 - Modify SamzaAppMaster to enable host-affinity

Posted by "Yi Pan (Data Infrastructure)" <yi...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37817/#review100640
-----------------------------------------------------------

Ship it!


Ship It!

- Yi Pan (Data Infrastructure)


On Sept. 23, 2015, 3:59 a.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37817/
> -----------------------------------------------------------
> 
> (Updated Sept. 23, 2015, 3:59 a.m.)
> 
> 
> Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-619
>     https://issues.apache.org/jira/browse/SAMZA-619
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> This a major change to how we request and assign containers to resources. It uses a threaded model for request and allocation. More comments in the javadoc. 
> 
> Major changes to note:
> 1. Changed yarn dependency to 2.6
> 2. Moved YarnConfig to java config class
> 2. Removed SamzaAppMasterTaskManager
> 3. SamzaAppState replaces SamzaAppMasterState
> 4. SamzaTaskManager is very similar to SamzaAppMasterTaskManager, except that it delegates onContainerAllocated and requestContainers to the thread in ContainerAllocator 
> 5. Removed state.unclaimedContainers
> 6. Allocator thread sleep time and request timeout is configurable
> 7. host-affinity can be enabled by using the config "yarn.samza.host-affinity.enabled". Host affinity is guaranteed to work with FairScheduler that has continuous scheduling enabled. Details on this config can be found at SAMZA-617 [design](https://issues.apache.org/jira/secure/attachment/12726945/DESIGN-SAMZA-617-2.pdf).
> 8. Added unit tests for TaskManager & ContainerAllocator 
> 9. Updated config documentation
> 10. Removed TestSamzaAppMasterTaskManager.scala
> 
> Pending items:
> 1. Update web-site with info on this feature (SAMZA-668)
> 
> 
> Diffs
> -----
> 
>   build.gradle 3a7fabcd4f4e5c8db950c3a33bba33618c5565b4 
>   docs/learn/documentation/versioned/jobs/configuration-table.html c23d7d37f151a0dbdd71f52588773bc67edf88c8 
>   gradle/dependency-versions.gradle 36d564b6ca895f042ee4802643e49180f4947b62 
>   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java c567bf4a1747a7a6aff23be74cf25b1a44e57b42 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java 5455881e2a4a9c865137f2708f655641b93c2bfb 
>   samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 7b592740c082225fc217a24188bc874f883db63b 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 61e228b1ff8619da4ad2e11b15a5afb7999c4996 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala c8438bd6ed01a95860867505d84c04c493a5a197 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java PRE-CREATION 
>   samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml ce2145adde09b0e8097a1c711bc296ee20392e63 
>   samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 1f51551893c42bb13d7941c8b0e5594ac7f42585 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala af42c6a6636953a95f79837fe372e0dbd735df70 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala d475c4c566f17f88a04db5fbc84cc2e27eb333d2 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala 03acfe1bbbabf8f54be9f36fdae785476da45135 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala 060538623e4d67b986bc635518e7fe8ebdde9e24 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala f667c83a7fff43a5efdc64cde019b2cf35f38cb9 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala 1743c8611a94fa1c7f7dafd8ff8c713039f3df8c 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 8dd70c9977473e083e463d01b049c40e15b21f4a 
>   samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala 09f4dc32a4b18aeb3accb856c360ea2f95c82673 
>   samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala 7fd51221149ba59e7ea1bc0b51d58726ec17a7d7 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockHttpServer.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockNMClient.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestAMRMClientImpl.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java PRE-CREATION 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala 7b7d86a43c69e72c47eaa91f68be24e0f4022891 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala df5992e659302d2918c4e2c30b6122ed51ab9fe8 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala 6f4bfaf916d687426f746fd3b09cd56490bb500e 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 2eec65f02826de40493925c08ff344a8cc4feecb 
> 
> Diff: https://reviews.apache.org/r/37817/diff/
> 
> 
> Testing
> -------
> 
> Tested with hello-samza locally. Tested with a sample job on a 3 node Yarn cluster. Works as expected.
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 37817: SAMZA-619 - Modify SamzaAppMaster to enable host-affinity

Posted by Navina Ramesh <nr...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37817/
-----------------------------------------------------------

(Updated Sept. 23, 2015, 3:59 a.m.)


Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and Yi Pan (Data Infrastructure).


Bugs: SAMZA-619
    https://issues.apache.org/jira/browse/SAMZA-619


Repository: samza


Description
-------

This a major change to how we request and assign containers to resources. It uses a threaded model for request and allocation. More comments in the javadoc. 

Major changes to note:
1. Changed yarn dependency to 2.6
2. Moved YarnConfig to java config class
2. Removed SamzaAppMasterTaskManager
3. SamzaAppState replaces SamzaAppMasterState
4. SamzaTaskManager is very similar to SamzaAppMasterTaskManager, except that it delegates onContainerAllocated and requestContainers to the thread in ContainerAllocator 
5. Removed state.unclaimedContainers
6. Allocator thread sleep time and request timeout is configurable
7. host-affinity can be enabled by using the config "yarn.samza.host-affinity.enabled". Host affinity is guaranteed to work with FairScheduler that has continuous scheduling enabled. Details on this config can be found at SAMZA-617 [design](https://issues.apache.org/jira/secure/attachment/12726945/DESIGN-SAMZA-617-2.pdf).
8. Added unit tests for TaskManager & ContainerAllocator 
9. Updated config documentation
10. Removed TestSamzaAppMasterTaskManager.scala

Pending items:
1. Update web-site with info on this feature (SAMZA-668)


Diffs (updated)
-----

  build.gradle 3a7fabcd4f4e5c8db950c3a33bba33618c5565b4 
  docs/learn/documentation/versioned/jobs/configuration-table.html c23d7d37f151a0dbdd71f52588773bc67edf88c8 
  gradle/dependency-versions.gradle 36d564b6ca895f042ee4802643e49180f4947b62 
  samza-core/src/main/java/org/apache/samza/container/LocalityManager.java c567bf4a1747a7a6aff23be74cf25b1a44e57b42 
  samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java 5455881e2a4a9c865137f2708f655641b93c2bfb 
  samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 7b592740c082225fc217a24188bc874f883db63b 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 61e228b1ff8619da4ad2e11b15a5afb7999c4996 
  samza-core/src/main/scala/org/apache/samza/util/Util.scala c8438bd6ed01a95860867505d84c04c493a5a197 
  samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java PRE-CREATION 
  samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml ce2145adde09b0e8097a1c711bc296ee20392e63 
  samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 1f51551893c42bb13d7941c8b0e5594ac7f42585 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala af42c6a6636953a95f79837fe372e0dbd735df70 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala d475c4c566f17f88a04db5fbc84cc2e27eb333d2 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala 03acfe1bbbabf8f54be9f36fdae785476da45135 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala 060538623e4d67b986bc635518e7fe8ebdde9e24 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala f667c83a7fff43a5efdc64cde019b2cf35f38cb9 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala 1743c8611a94fa1c7f7dafd8ff8c713039f3df8c 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 8dd70c9977473e083e463d01b049c40e15b21f4a 
  samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala 09f4dc32a4b18aeb3accb856c360ea2f95c82673 
  samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala 7fd51221149ba59e7ea1bc0b51d58726ec17a7d7 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockHttpServer.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockNMClient.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestAMRMClientImpl.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java PRE-CREATION 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala 7b7d86a43c69e72c47eaa91f68be24e0f4022891 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala df5992e659302d2918c4e2c30b6122ed51ab9fe8 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala 6f4bfaf916d687426f746fd3b09cd56490bb500e 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 2eec65f02826de40493925c08ff344a8cc4feecb 

Diff: https://reviews.apache.org/r/37817/diff/


Testing
-------

Tested with hello-samza locally. Tested with a sample job on a 3 node Yarn cluster. Works as expected.


Thanks,

Navina Ramesh


Re: Review Request 37817: SAMZA-619 - Modify SamzaAppMaster to enable host-affinity

Posted by Navina Ramesh <nr...@linkedin.com>.

> On Sept. 23, 2015, 1:38 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java, line 57
> > <https://reviews.apache.org/r/37817/diff/7/?file=1077571#file1077571line57>
> >
> >     nit: wouldn't it make more sense to set the init value to false?

If it is set to false when instantiated, I need to explicitly set it as true in SamzaTaskManager before starting the thread. Seems kind of confusing. That's why I let the default be true.


> On Sept. 23, 2015, 1:38 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java, line 110
> > <https://reviews.apache.org/r/37817/diff/7/?file=1077571#file1077571line110>
> >
> >     I realized that isRunning is not really a state variable of the Runnable. It is more like a "stop" flag that is set by another thread (maybe the main/admin thread). It would read better if we rename it to isStopped.

Yeah. It is a stop flag. Ah.. If I rename it, I have to change the conditions in the running loop. I think while(isRunning) is easier to read than while(!isStopped). Also, the above comment for setting it explicitly in SamzaTaskManager applies here too. Otherwise , it is pretty confusing semantically.


> On Sept. 23, 2015, 1:38 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java, line 207
> > <https://reviews.apache.org/r/37817/diff/7/?file=1077574#file1077574line207>
> >
> >     nit: this seems should be logged at the same level as the non-host affinity case (where we are using log.info).

Makes sense.


> On Sept. 23, 2015, 1:38 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java, line 116
> > <https://reviews.apache.org/r/37817/diff/7/?file=1077579#file1077579line116>
> >
> >     nit: per the comment, should we remove? :)

Oh yes :)


> On Sept. 23, 2015, 1:38 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java, line 150
> > <https://reviews.apache.org/r/37817/diff/7/?file=1077579#file1077579line150>
> >
> >     nit: It seems to me that it would make more sense to move it to line 209, instead of in the method's javadoc.

Ok


> On Sept. 23, 2015, 1:38 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala, line 29
> > <https://reviews.apache.org/r/37817/diff/7/?file=1077603#file1077603line29>
> >
> >     I think that this is also YARN 2.6+ only. So, it seems that we will need to drop the support for YARN 2.4 and 2.5 to get this patch in. I will ping Yan to close the open source mailing list vote on this.

Yeah. I didn't realize that the voting was still open. Thanks for the follow-up!


On Sept. 23, 2015, 1:38 a.m., Navina Ramesh wrote:
> > Overall look very good to me, except for a few nits. The only thing that I would recommend is to hold on this check-in until SAMZA-563 is settled s.t. we officially drop the support for YARN 2.4 and 2.5 and don't break ./bin/check-all.sh

Yep. Sure!


- Navina


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37817/#review100063
-----------------------------------------------------------


On Sept. 20, 2015, 4:45 a.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37817/
> -----------------------------------------------------------
> 
> (Updated Sept. 20, 2015, 4:45 a.m.)
> 
> 
> Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-619
>     https://issues.apache.org/jira/browse/SAMZA-619
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> This a major change to how we request and assign containers to resources. It uses a threaded model for request and allocation. More comments in the javadoc. 
> 
> Major changes to note:
> 1. Changed yarn dependency to 2.6
> 2. Moved YarnConfig to java config class
> 2. Removed SamzaAppMasterTaskManager
> 3. SamzaAppState replaces SamzaAppMasterState
> 4. SamzaTaskManager is very similar to SamzaAppMasterTaskManager, except that it delegates onContainerAllocated and requestContainers to the thread in ContainerAllocator 
> 5. Removed state.unclaimedContainers
> 6. Allocator thread sleep time and request timeout is configurable
> 7. host-affinity can be enabled by using the config "yarn.samza.host-affinity.enabled". Host affinity is guaranteed to work with FairScheduler that has continuous scheduling enabled. Details on this config can be found at SAMZA-617 [design](https://issues.apache.org/jira/secure/attachment/12726945/DESIGN-SAMZA-617-2.pdf).
> 8. Added unit tests for TaskManager & ContainerAllocator 
> 9. Updated config documentation
> 10. Removed TestSamzaAppMasterTaskManager.scala
> 
> Pending items:
> 1. Update web-site with info on this feature (SAMZA-668)
> 
> 
> Diffs
> -----
> 
>   build.gradle 3a7fabcd4f4e5c8db950c3a33bba33618c5565b4 
>   docs/learn/documentation/versioned/jobs/configuration-table.html c23d7d37f151a0dbdd71f52588773bc67edf88c8 
>   gradle/dependency-versions.gradle 36d564b6ca895f042ee4802643e49180f4947b62 
>   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java c567bf4a1747a7a6aff23be74cf25b1a44e57b42 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java 5455881e2a4a9c865137f2708f655641b93c2bfb 
>   samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 7b592740c082225fc217a24188bc874f883db63b 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 61e228b1ff8619da4ad2e11b15a5afb7999c4996 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala c8438bd6ed01a95860867505d84c04c493a5a197 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java PRE-CREATION 
>   samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml ce2145adde09b0e8097a1c711bc296ee20392e63 
>   samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 1f51551893c42bb13d7941c8b0e5594ac7f42585 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala af42c6a6636953a95f79837fe372e0dbd735df70 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala d475c4c566f17f88a04db5fbc84cc2e27eb333d2 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala 03acfe1bbbabf8f54be9f36fdae785476da45135 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala 060538623e4d67b986bc635518e7fe8ebdde9e24 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala f667c83a7fff43a5efdc64cde019b2cf35f38cb9 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala 1743c8611a94fa1c7f7dafd8ff8c713039f3df8c 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 8dd70c9977473e083e463d01b049c40e15b21f4a 
>   samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala 09f4dc32a4b18aeb3accb856c360ea2f95c82673 
>   samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala 7fd51221149ba59e7ea1bc0b51d58726ec17a7d7 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockHttpServer.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockNMClient.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestAMRMClientImpl.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java PRE-CREATION 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala 7b7d86a43c69e72c47eaa91f68be24e0f4022891 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala df5992e659302d2918c4e2c30b6122ed51ab9fe8 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala 6f4bfaf916d687426f746fd3b09cd56490bb500e 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 2eec65f02826de40493925c08ff344a8cc4feecb 
> 
> Diff: https://reviews.apache.org/r/37817/diff/
> 
> 
> Testing
> -------
> 
> Tested with hello-samza locally. Tested with a sample job on a 3 node Yarn cluster. Works as expected.
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 37817: SAMZA-619 - Modify SamzaAppMaster to enable host-affinity

Posted by "Yi Pan (Data Infrastructure)" <yi...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37817/#review100063
-----------------------------------------------------------



samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java (line 57)
<https://reviews.apache.org/r/37817/#comment157163>

    nit: wouldn't it make more sense to set the init value to false?



samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java (line 110)
<https://reviews.apache.org/r/37817/#comment157167>

    I realized that isRunning is not really a state variable of the Runnable. It is more like a "stop" flag that is set by another thread (maybe the main/admin thread). It would read better if we rename it to isStopped.



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (line 207)
<https://reviews.apache.org/r/37817/#comment157173>

    nit: this seems should be logged at the same level as the non-host affinity case (where we are using log.info).



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 116)
<https://reviews.apache.org/r/37817/#comment157233>

    nit: per the comment, should we remove? :)



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 150)
<https://reviews.apache.org/r/37817/#comment157234>

    nit: It seems to me that it would make more sense to move it to line 209, instead of in the method's javadoc.



samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala (line 29)
<https://reviews.apache.org/r/37817/#comment157249>

    I think that this is also YARN 2.6+ only. So, it seems that we will need to drop the support for YARN 2.4 and 2.5 to get this patch in. I will ping Yan to close the open source mailing list vote on this.


Overall look very good to me, except for a few nits. The only thing that I would recommend is to hold on this check-in until SAMZA-563 is settled s.t. we officially drop the support for YARN 2.4 and 2.5 and don't break ./bin/check-all.sh

- Yi Pan (Data Infrastructure)


On Sept. 20, 2015, 4:45 a.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37817/
> -----------------------------------------------------------
> 
> (Updated Sept. 20, 2015, 4:45 a.m.)
> 
> 
> Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-619
>     https://issues.apache.org/jira/browse/SAMZA-619
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> This a major change to how we request and assign containers to resources. It uses a threaded model for request and allocation. More comments in the javadoc. 
> 
> Major changes to note:
> 1. Changed yarn dependency to 2.6
> 2. Moved YarnConfig to java config class
> 2. Removed SamzaAppMasterTaskManager
> 3. SamzaAppState replaces SamzaAppMasterState
> 4. SamzaTaskManager is very similar to SamzaAppMasterTaskManager, except that it delegates onContainerAllocated and requestContainers to the thread in ContainerAllocator 
> 5. Removed state.unclaimedContainers
> 6. Allocator thread sleep time and request timeout is configurable
> 7. host-affinity can be enabled by using the config "yarn.samza.host-affinity.enabled". Host affinity is guaranteed to work with FairScheduler that has continuous scheduling enabled. Details on this config can be found at SAMZA-617 [design](https://issues.apache.org/jira/secure/attachment/12726945/DESIGN-SAMZA-617-2.pdf).
> 8. Added unit tests for TaskManager & ContainerAllocator 
> 9. Updated config documentation
> 10. Removed TestSamzaAppMasterTaskManager.scala
> 
> Pending items:
> 1. Update web-site with info on this feature (SAMZA-668)
> 
> 
> Diffs
> -----
> 
>   build.gradle 3a7fabcd4f4e5c8db950c3a33bba33618c5565b4 
>   docs/learn/documentation/versioned/jobs/configuration-table.html c23d7d37f151a0dbdd71f52588773bc67edf88c8 
>   gradle/dependency-versions.gradle 36d564b6ca895f042ee4802643e49180f4947b62 
>   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java c567bf4a1747a7a6aff23be74cf25b1a44e57b42 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java 5455881e2a4a9c865137f2708f655641b93c2bfb 
>   samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 7b592740c082225fc217a24188bc874f883db63b 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 61e228b1ff8619da4ad2e11b15a5afb7999c4996 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala c8438bd6ed01a95860867505d84c04c493a5a197 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java PRE-CREATION 
>   samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml ce2145adde09b0e8097a1c711bc296ee20392e63 
>   samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 1f51551893c42bb13d7941c8b0e5594ac7f42585 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala af42c6a6636953a95f79837fe372e0dbd735df70 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala d475c4c566f17f88a04db5fbc84cc2e27eb333d2 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala 03acfe1bbbabf8f54be9f36fdae785476da45135 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala 060538623e4d67b986bc635518e7fe8ebdde9e24 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala f667c83a7fff43a5efdc64cde019b2cf35f38cb9 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala 1743c8611a94fa1c7f7dafd8ff8c713039f3df8c 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 8dd70c9977473e083e463d01b049c40e15b21f4a 
>   samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala 09f4dc32a4b18aeb3accb856c360ea2f95c82673 
>   samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala 7fd51221149ba59e7ea1bc0b51d58726ec17a7d7 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockHttpServer.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockNMClient.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestAMRMClientImpl.java PRE-CREATION 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java PRE-CREATION 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala 7b7d86a43c69e72c47eaa91f68be24e0f4022891 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala df5992e659302d2918c4e2c30b6122ed51ab9fe8 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala 6f4bfaf916d687426f746fd3b09cd56490bb500e 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 2eec65f02826de40493925c08ff344a8cc4feecb 
> 
> Diff: https://reviews.apache.org/r/37817/diff/
> 
> 
> Testing
> -------
> 
> Tested with hello-samza locally. Tested with a sample job on a 3 node Yarn cluster. Works as expected.
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 37817: SAMZA-619 - Modify SamzaAppMaster to enable host-affinity

Posted by Navina Ramesh <nr...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37817/
-----------------------------------------------------------

(Updated Sept. 20, 2015, 4:45 a.m.)


Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and Yi Pan (Data Infrastructure).


Bugs: SAMZA-619
    https://issues.apache.org/jira/browse/SAMZA-619


Repository: samza


Description (updated)
-------

This a major change to how we request and assign containers to resources. It uses a threaded model for request and allocation. More comments in the javadoc. 

Major changes to note:
1. Changed yarn dependency to 2.6
2. Moved YarnConfig to java config class
2. Removed SamzaAppMasterTaskManager
3. SamzaAppState replaces SamzaAppMasterState
4. SamzaTaskManager is very similar to SamzaAppMasterTaskManager, except that it delegates onContainerAllocated and requestContainers to the thread in ContainerAllocator 
5. Removed state.unclaimedContainers
6. Allocator thread sleep time and request timeout is configurable
7. host-affinity can be enabled by using the config "yarn.samza.host-affinity.enabled". Host affinity is guaranteed to work with FairScheduler that has continuous scheduling enabled. Details on this config can be found at SAMZA-617 [design](https://issues.apache.org/jira/secure/attachment/12726945/DESIGN-SAMZA-617-2.pdf).
8. Added unit tests for TaskManager & ContainerAllocator 
9. Updated config documentation
10. Removed TestSamzaAppMasterTaskManager.scala

Pending items:
1. Update web-site with info on this feature (SAMZA-668)


Diffs (updated)
-----

  build.gradle 3a7fabcd4f4e5c8db950c3a33bba33618c5565b4 
  docs/learn/documentation/versioned/jobs/configuration-table.html c23d7d37f151a0dbdd71f52588773bc67edf88c8 
  gradle/dependency-versions.gradle 36d564b6ca895f042ee4802643e49180f4947b62 
  samza-core/src/main/java/org/apache/samza/container/LocalityManager.java c567bf4a1747a7a6aff23be74cf25b1a44e57b42 
  samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java 5455881e2a4a9c865137f2708f655641b93c2bfb 
  samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 7b592740c082225fc217a24188bc874f883db63b 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 61e228b1ff8619da4ad2e11b15a5afb7999c4996 
  samza-core/src/main/scala/org/apache/samza/util/Util.scala c8438bd6ed01a95860867505d84c04c493a5a197 
  samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java PRE-CREATION 
  samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml ce2145adde09b0e8097a1c711bc296ee20392e63 
  samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 1f51551893c42bb13d7941c8b0e5594ac7f42585 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala af42c6a6636953a95f79837fe372e0dbd735df70 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala d475c4c566f17f88a04db5fbc84cc2e27eb333d2 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala 03acfe1bbbabf8f54be9f36fdae785476da45135 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala 060538623e4d67b986bc635518e7fe8ebdde9e24 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala f667c83a7fff43a5efdc64cde019b2cf35f38cb9 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala 1743c8611a94fa1c7f7dafd8ff8c713039f3df8c 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 8dd70c9977473e083e463d01b049c40e15b21f4a 
  samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala 09f4dc32a4b18aeb3accb856c360ea2f95c82673 
  samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala 7fd51221149ba59e7ea1bc0b51d58726ec17a7d7 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockHttpServer.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockNMClient.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestAMRMClientImpl.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java PRE-CREATION 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala 7b7d86a43c69e72c47eaa91f68be24e0f4022891 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala df5992e659302d2918c4e2c30b6122ed51ab9fe8 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala 6f4bfaf916d687426f746fd3b09cd56490bb500e 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 2eec65f02826de40493925c08ff344a8cc4feecb 

Diff: https://reviews.apache.org/r/37817/diff/


Testing
-------

Tested with hello-samza locally. Tested with a sample job on a 3 node Yarn cluster. Works as expected.


Thanks,

Navina Ramesh


Re: Review Request 37817: SAMZA-619 - Modify SamzaAppMaster to enable host-affinity

Posted by Navina Ramesh <nr...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37817/
-----------------------------------------------------------

(Updated Sept. 16, 2015, 8:28 a.m.)


Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and Yi Pan (Data Infrastructure).


Bugs: SAMZA-619
    https://issues.apache.org/jira/browse/SAMZA-619


Repository: samza


Description
-------

This a major change to how we request and assign containers to resources. It uses a threaded model for request and allocation. More comments in the javadoc. 

Major changes to note:
1. Changed yarn dependency to 2.6
2. Moved YarnConfig to java config class
2. Removed SamzaAppMasterTaskManager
3. SamzaAppState replaces SamzaAppMasterState
4. SamzaTaskManager is very similar to SamzaAppMasterTaskManager, except that it delegates onContainerAllocated and requestContainers to the thread in ContainerAllocator 
5. Removed state.unclaimedContainers
6. Allocator thread sleep time and request timeout is configurable
7. host-affinity can be enabled by using the config "yarn.samza.host-affinity.enabled". Host affinity is guaranteed to work with FairScheduler that has continuous scheduling enabled. Details on this config can be found at SAMZA-617 [design](https://issues.apache.org/jira/secure/attachment/12726945/DESIGN-SAMZA-617-2.pdf).
8. Added unit tests for TaskManager & ContainerAllocator 
9. Updated config documentation

Pending items:
1. Update web-site with info on this feature (SAMZA-668)


Diffs (updated)
-----

  build.gradle 3a7fabcd4f4e5c8db950c3a33bba33618c5565b4 
  docs/contribute/coding-guide.md 6cb4cee6e182172e840e907c9e3ededddb7f4bc2 
  docs/learn/documentation/versioned/jobs/configuration-table.html c23d7d37f151a0dbdd71f52588773bc67edf88c8 
  gradle/dependency-versions.gradle 36d564b6ca895f042ee4802643e49180f4947b62 
  samza-core/src/main/java/org/apache/samza/container/LocalityManager.java c567bf4a1747a7a6aff23be74cf25b1a44e57b42 
  samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java 5455881e2a4a9c865137f2708f655641b93c2bfb 
  samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 7b592740c082225fc217a24188bc874f883db63b 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 61e228b1ff8619da4ad2e11b15a5afb7999c4996 
  samza-core/src/main/scala/org/apache/samza/util/Util.scala c8438bd6ed01a95860867505d84c04c493a5a197 
  samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java PRE-CREATION 
  samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml ce2145adde09b0e8097a1c711bc296ee20392e63 
  samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 1f51551893c42bb13d7941c8b0e5594ac7f42585 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala af42c6a6636953a95f79837fe372e0dbd735df70 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala d475c4c566f17f88a04db5fbc84cc2e27eb333d2 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala 03acfe1bbbabf8f54be9f36fdae785476da45135 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala 060538623e4d67b986bc635518e7fe8ebdde9e24 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala f667c83a7fff43a5efdc64cde019b2cf35f38cb9 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala 1743c8611a94fa1c7f7dafd8ff8c713039f3df8c 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 8dd70c9977473e083e463d01b049c40e15b21f4a 
  samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala 09f4dc32a4b18aeb3accb856c360ea2f95c82673 
  samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala 7fd51221149ba59e7ea1bc0b51d58726ec17a7d7 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockHttpServer.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockNMClient.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestAMRMClientImpl.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java PRE-CREATION 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala df5992e659302d2918c4e2c30b6122ed51ab9fe8 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala 6f4bfaf916d687426f746fd3b09cd56490bb500e 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 2eec65f02826de40493925c08ff344a8cc4feecb 

Diff: https://reviews.apache.org/r/37817/diff/


Testing
-------

Tested with hello-samza locally. Tested with a sample job on a 3 node Yarn cluster. Works as expected.


Thanks,

Navina Ramesh


Re: Review Request 37817: SAMZA-619 - Modify SamzaAppMaster to enable host-affinity

Posted by Navina Ramesh <nr...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37817/
-----------------------------------------------------------

(Updated Sept. 9, 2015, 12:28 a.m.)


Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and Yi Pan (Data Infrastructure).


Changes
-------

Just updated some javadoc comments


Bugs: SAMZA-619
    https://issues.apache.org/jira/browse/SAMZA-619


Repository: samza


Description
-------

This a major change to how we request and assign containers to resources. It uses a threaded model for request and allocation. More comments in the javadoc. 

Major changes to note:
1. Changed yarn dependency to 2.6
2. Moved YarnConfig to java config class
2. Removed SamzaAppMasterTaskManager
3. SamzaAppState replaces SamzaAppMasterState
4. SamzaTaskManager is very similar to SamzaAppMasterTaskManager, except that it delegates onContainerAllocated and requestContainers to the thread in ContainerAllocator 
5. Removed state.unclaimedContainers
6. Allocator thread sleep time and request timeout is configurable
7. host-affinity can be enabled by using the config "yarn.samza.host-affinity.enabled". Host affinity is guaranteed to work with FairScheduler that has continuous scheduling enabled. Details on this config can be found at SAMZA-617 [design](https://issues.apache.org/jira/secure/attachment/12726945/DESIGN-SAMZA-617-2.pdf).
8. Added unit tests for TaskManager & ContainerAllocator 
9. Updated config documentation

Pending items:
1. Update web-site with info on this feature (SAMZA-668)


Diffs (updated)
-----

  build.gradle 3a7fabcd4f4e5c8db950c3a33bba33618c5565b4 
  docs/learn/documentation/versioned/jobs/configuration-table.html c23d7d37f151a0dbdd71f52588773bc67edf88c8 
  gradle/dependency-versions.gradle 36d564b6ca895f042ee4802643e49180f4947b62 
  samza-core/src/main/java/org/apache/samza/container/LocalityManager.java c567bf4a1747a7a6aff23be74cf25b1a44e57b42 
  samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java 5455881e2a4a9c865137f2708f655641b93c2bfb 
  samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 7b592740c082225fc217a24188bc874f883db63b 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 61e228b1ff8619da4ad2e11b15a5afb7999c4996 
  samza-core/src/main/scala/org/apache/samza/util/Util.scala c8438bd6ed01a95860867505d84c04c493a5a197 
  samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java PRE-CREATION 
  samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml ce2145adde09b0e8097a1c711bc296ee20392e63 
  samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 1f51551893c42bb13d7941c8b0e5594ac7f42585 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala af42c6a6636953a95f79837fe372e0dbd735df70 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala d475c4c566f17f88a04db5fbc84cc2e27eb333d2 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala 03acfe1bbbabf8f54be9f36fdae785476da45135 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala 060538623e4d67b986bc635518e7fe8ebdde9e24 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala f667c83a7fff43a5efdc64cde019b2cf35f38cb9 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala 1743c8611a94fa1c7f7dafd8ff8c713039f3df8c 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 8dd70c9977473e083e463d01b049c40e15b21f4a 
  samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala 09f4dc32a4b18aeb3accb856c360ea2f95c82673 
  samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala 7fd51221149ba59e7ea1bc0b51d58726ec17a7d7 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockHttpServer.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockNMClient.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestAMRMClientImpl.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java PRE-CREATION 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala df5992e659302d2918c4e2c30b6122ed51ab9fe8 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala 6f4bfaf916d687426f746fd3b09cd56490bb500e 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 2eec65f02826de40493925c08ff344a8cc4feecb 

Diff: https://reviews.apache.org/r/37817/diff/


Testing
-------

Tested with hello-samza locally. Tested with a sample job on a 3 node Yarn cluster. Works as expected.


Thanks,

Navina Ramesh


Re: Review Request 37817: SAMZA-619 - Modify SamzaAppMaster to enable host-affinity

Posted by Navina Ramesh <nr...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37817/
-----------------------------------------------------------

(Updated Sept. 8, 2015, 11:17 p.m.)


Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and Yi Pan (Data Infrastructure).


Bugs: SAMZA-619
    https://issues.apache.org/jira/browse/SAMZA-619


Repository: samza


Description (updated)
-------

This a major change to how we request and assign containers to resources. It uses a threaded model for request and allocation. More comments in the javadoc. 

Major changes to note:
1. Changed yarn dependency to 2.6
2. Moved YarnConfig to java config class
2. Removed SamzaAppMasterTaskManager
3. SamzaAppState replaces SamzaAppMasterState
4. SamzaTaskManager is very similar to SamzaAppMasterTaskManager, except that it delegates onContainerAllocated and requestContainers to the thread in ContainerAllocator 
5. Removed state.unclaimedContainers
6. Allocator thread sleep time and request timeout is configurable
7. host-affinity can be enabled by using the config "yarn.samza.host-affinity.enabled". Host affinity is guaranteed to work with FairScheduler that has continuous scheduling enabled. Details on this config can be found at SAMZA-617 [design](https://issues.apache.org/jira/secure/attachment/12726945/DESIGN-SAMZA-617-2.pdf).
8. Added unit tests for TaskManager & ContainerAllocator 
9. Updated config documentation

Pending items:
1. Update web-site with info on this feature (SAMZA-668)


Diffs (updated)
-----

  build.gradle 3a7fabcd4f4e5c8db950c3a33bba33618c5565b4 
  docs/learn/documentation/versioned/jobs/configuration-table.html c23d7d37f151a0dbdd71f52588773bc67edf88c8 
  gradle/dependency-versions.gradle 36d564b6ca895f042ee4802643e49180f4947b62 
  samza-core/src/main/java/org/apache/samza/container/LocalityManager.java c567bf4a1747a7a6aff23be74cf25b1a44e57b42 
  samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java 5455881e2a4a9c865137f2708f655641b93c2bfb 
  samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 7b592740c082225fc217a24188bc874f883db63b 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 61e228b1ff8619da4ad2e11b15a5afb7999c4996 
  samza-core/src/main/scala/org/apache/samza/util/Util.scala c8438bd6ed01a95860867505d84c04c493a5a197 
  samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java PRE-CREATION 
  samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml ce2145adde09b0e8097a1c711bc296ee20392e63 
  samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 1f51551893c42bb13d7941c8b0e5594ac7f42585 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala af42c6a6636953a95f79837fe372e0dbd735df70 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala d475c4c566f17f88a04db5fbc84cc2e27eb333d2 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala 03acfe1bbbabf8f54be9f36fdae785476da45135 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala 060538623e4d67b986bc635518e7fe8ebdde9e24 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala f667c83a7fff43a5efdc64cde019b2cf35f38cb9 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala 1743c8611a94fa1c7f7dafd8ff8c713039f3df8c 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 8dd70c9977473e083e463d01b049c40e15b21f4a 
  samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala 09f4dc32a4b18aeb3accb856c360ea2f95c82673 
  samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala 7fd51221149ba59e7ea1bc0b51d58726ec17a7d7 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockHttpServer.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockNMClient.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestAMRMClientImpl.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java PRE-CREATION 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala df5992e659302d2918c4e2c30b6122ed51ab9fe8 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala 6f4bfaf916d687426f746fd3b09cd56490bb500e 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 2eec65f02826de40493925c08ff344a8cc4feecb 

Diff: https://reviews.apache.org/r/37817/diff/


Testing
-------

Tested with hello-samza locally. Tested with a sample job on a 3 node Yarn cluster. Works as expected.


Thanks,

Navina Ramesh


Re: Review Request 37817: SAMZA-619 - Modify SamzaAppMaster to enable host-affinity

Posted by Navina Ramesh <nr...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37817/
-----------------------------------------------------------

(Updated Sept. 2, 2015, 9:33 p.m.)


Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and Yi Pan (Data Infrastructure).


Bugs: SAMZA-619
    https://issues.apache.org/jira/browse/SAMZA-619


Repository: samza


Description
-------

This a major change to how we request and assign containers to resources. It uses a threaded model for request and allocation. More comments in the javadoc. 

Major changes to note:
1. Changed yarn dependency to 2.6
2. Moved YarnConfig to java config class
2. Removed SamzaAppMasterTaskManager
3. SamzaAppState replaces SamzaAppMasterState
4. SamzaTaskManager is very similar to SamzaAppMasterTaskManager, except that it delegates onContainerAllocated and requestContainers to the thread in ContainerAllocator 
5. Removed state.unclaimedContainers
6. Allocator thread sleep time and request timeout is configurable
7. host-affinity can be enabled by using the config "yarn.samza.host-affinity.enabled". Host affinity is guaranteed to work with FairScheduler that has continuous scheduling enabled. Details on this config can be found at SAMZA-617 [design](https://issues.apache.org/jira/secure/attachment/12726945/DESIGN-SAMZA-617-2.pdf).

Pending items:
1. Adding unit tests for AppMaster, TaskManager, ContainerAllocator
2. Update config documentation 
3. Update web-site with info on this feature


Diffs (updated)
-----

  build.gradle 3a7fabcd4f4e5c8db950c3a33bba33618c5565b4 
  docs/learn/documentation/versioned/jobs/configuration-table.html c23d7d37f151a0dbdd71f52588773bc67edf88c8 
  gradle/dependency-versions.gradle 36d564b6ca895f042ee4802643e49180f4947b62 
  samza-core/src/main/java/org/apache/samza/container/LocalityManager.java c567bf4a1747a7a6aff23be74cf25b1a44e57b42 
  samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java 5455881e2a4a9c865137f2708f655641b93c2bfb 
  samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 7b592740c082225fc217a24188bc874f883db63b 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 61e228b1ff8619da4ad2e11b15a5afb7999c4996 
  samza-core/src/main/scala/org/apache/samza/util/Util.scala c8438bd6ed01a95860867505d84c04c493a5a197 
  samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java PRE-CREATION 
  samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml ce2145adde09b0e8097a1c711bc296ee20392e63 
  samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 1f51551893c42bb13d7941c8b0e5594ac7f42585 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala af42c6a6636953a95f79837fe372e0dbd735df70 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala d475c4c566f17f88a04db5fbc84cc2e27eb333d2 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala 03acfe1bbbabf8f54be9f36fdae785476da45135 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala 060538623e4d67b986bc635518e7fe8ebdde9e24 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala f667c83a7fff43a5efdc64cde019b2cf35f38cb9 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala 1743c8611a94fa1c7f7dafd8ff8c713039f3df8c 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 8dd70c9977473e083e463d01b049c40e15b21f4a 
  samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala 09f4dc32a4b18aeb3accb856c360ea2f95c82673 
  samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala 7fd51221149ba59e7ea1bc0b51d58726ec17a7d7 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockHttpServer.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockNMClient.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestAMRMClientImpl.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java PRE-CREATION 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala df5992e659302d2918c4e2c30b6122ed51ab9fe8 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala 6f4bfaf916d687426f746fd3b09cd56490bb500e 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 2eec65f02826de40493925c08ff344a8cc4feecb 

Diff: https://reviews.apache.org/r/37817/diff/


Testing
-------

Tested with hello-samza locally. Tested with a sample job on a 3 node Yarn cluster. Works as expected.


Thanks,

Navina Ramesh


Re: Review Request 37817: SAMZA-619 - Modify SamzaAppMaster to enable host-affinity

Posted by Navina Ramesh <nr...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37817/
-----------------------------------------------------------

(Updated Sept. 2, 2015, 7:17 p.m.)


Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and Yi Pan (Data Infrastructure).


Bugs: SAMZA-619
    https://issues.apache.org/jira/browse/SAMZA-619


Repository: samza


Description
-------

This a major change to how we request and assign containers to resources. It uses a threaded model for request and allocation. More comments in the javadoc. 

Major changes to note:
1. Changed yarn dependency to 2.6
2. Moved YarnConfig to java config class
2. Removed SamzaAppMasterTaskManager
3. SamzaAppState replaces SamzaAppMasterState
4. SamzaTaskManager is very similar to SamzaAppMasterTaskManager, except that it delegates onContainerAllocated and requestContainers to the thread in ContainerAllocator 
5. Removed state.unclaimedContainers
6. Allocator thread sleep time and request timeout is configurable
7. host-affinity can be enabled by using the config "yarn.samza.host-affinity.enabled". Host affinity is guaranteed to work with FairScheduler that has continuous scheduling enabled. Details on this config can be found at SAMZA-617 [design](https://issues.apache.org/jira/secure/attachment/12726945/DESIGN-SAMZA-617-2.pdf).

Pending items:
1. Adding unit tests for AppMaster, TaskManager, ContainerAllocator
2. Update config documentation 
3. Update web-site with info on this feature


Diffs
-----

  build.gradle 3a7fabcd4f4e5c8db950c3a33bba33618c5565b4 
  docs/learn/documentation/versioned/jobs/configuration-table.html c23d7d37f151a0dbdd71f52588773bc67edf88c8 
  gradle/dependency-versions.gradle 36d564b6ca895f042ee4802643e49180f4947b62 
  samza-core/src/main/java/org/apache/samza/container/LocalityManager.java c567bf4a1747a7a6aff23be74cf25b1a44e57b42 
  samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java 5455881e2a4a9c865137f2708f655641b93c2bfb 
  samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 7b592740c082225fc217a24188bc874f883db63b 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 61e228b1ff8619da4ad2e11b15a5afb7999c4996 
  samza-core/src/main/scala/org/apache/samza/util/Util.scala c8438bd6ed01a95860867505d84c04c493a5a197 
  samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java PRE-CREATION 
  samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml ce2145adde09b0e8097a1c711bc296ee20392e63 
  samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 1f51551893c42bb13d7941c8b0e5594ac7f42585 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala af42c6a6636953a95f79837fe372e0dbd735df70 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala d475c4c566f17f88a04db5fbc84cc2e27eb333d2 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala 03acfe1bbbabf8f54be9f36fdae785476da45135 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala 060538623e4d67b986bc635518e7fe8ebdde9e24 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala f667c83a7fff43a5efdc64cde019b2cf35f38cb9 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala 1743c8611a94fa1c7f7dafd8ff8c713039f3df8c 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 8dd70c9977473e083e463d01b049c40e15b21f4a 
  samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala 09f4dc32a4b18aeb3accb856c360ea2f95c82673 
  samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala 7fd51221149ba59e7ea1bc0b51d58726ec17a7d7 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockHttpServer.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockNMClient.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestAMRMClientImpl.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java PRE-CREATION 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala df5992e659302d2918c4e2c30b6122ed51ab9fe8 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala 6f4bfaf916d687426f746fd3b09cd56490bb500e 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 2eec65f02826de40493925c08ff344a8cc4feecb 

Diff: https://reviews.apache.org/r/37817/diff/


Testing
-------

Tested with hello-samza locally. Tested with a sample job on a 3 node Yarn cluster. Works as expected.


Thanks,

Navina Ramesh


Re: Review Request 37817: SAMZA-619 - Modify SamzaAppMaster to enable host-affinity

Posted by Navina Ramesh <nr...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37817/
-----------------------------------------------------------

(Updated Sept. 2, 2015, 7:17 p.m.)


Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and Yi Pan (Data Infrastructure).


Changes
-------

The unit test code may still have some commented lines. I am working on fixing it!


Bugs: SAMZA-619
    https://issues.apache.org/jira/browse/SAMZA-619


Repository: samza


Description
-------

This a major change to how we request and assign containers to resources. It uses a threaded model for request and allocation. More comments in the javadoc. 

Major changes to note:
1. Changed yarn dependency to 2.6
2. Moved YarnConfig to java config class
2. Removed SamzaAppMasterTaskManager
3. SamzaAppState replaces SamzaAppMasterState
4. SamzaTaskManager is very similar to SamzaAppMasterTaskManager, except that it delegates onContainerAllocated and requestContainers to the thread in ContainerAllocator 
5. Removed state.unclaimedContainers
6. Allocator thread sleep time and request timeout is configurable
7. host-affinity can be enabled by using the config "yarn.samza.host-affinity.enabled". Host affinity is guaranteed to work with FairScheduler that has continuous scheduling enabled. Details on this config can be found at SAMZA-617 [design](https://issues.apache.org/jira/secure/attachment/12726945/DESIGN-SAMZA-617-2.pdf).

Pending items:
1. Adding unit tests for AppMaster, TaskManager, ContainerAllocator
2. Update config documentation 
3. Update web-site with info on this feature


Diffs (updated)
-----

  build.gradle 3a7fabcd4f4e5c8db950c3a33bba33618c5565b4 
  docs/learn/documentation/versioned/jobs/configuration-table.html c23d7d37f151a0dbdd71f52588773bc67edf88c8 
  gradle/dependency-versions.gradle 36d564b6ca895f042ee4802643e49180f4947b62 
  samza-core/src/main/java/org/apache/samza/container/LocalityManager.java c567bf4a1747a7a6aff23be74cf25b1a44e57b42 
  samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java 5455881e2a4a9c865137f2708f655641b93c2bfb 
  samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 7b592740c082225fc217a24188bc874f883db63b 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 61e228b1ff8619da4ad2e11b15a5afb7999c4996 
  samza-core/src/main/scala/org/apache/samza/util/Util.scala c8438bd6ed01a95860867505d84c04c493a5a197 
  samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java PRE-CREATION 
  samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml ce2145adde09b0e8097a1c711bc296ee20392e63 
  samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 1f51551893c42bb13d7941c8b0e5594ac7f42585 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala af42c6a6636953a95f79837fe372e0dbd735df70 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala d475c4c566f17f88a04db5fbc84cc2e27eb333d2 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala 03acfe1bbbabf8f54be9f36fdae785476da45135 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala 060538623e4d67b986bc635518e7fe8ebdde9e24 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala f667c83a7fff43a5efdc64cde019b2cf35f38cb9 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala 1743c8611a94fa1c7f7dafd8ff8c713039f3df8c 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 8dd70c9977473e083e463d01b049c40e15b21f4a 
  samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala 09f4dc32a4b18aeb3accb856c360ea2f95c82673 
  samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala 7fd51221149ba59e7ea1bc0b51d58726ec17a7d7 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockHttpServer.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockNMClient.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestAMRMClientImpl.java PRE-CREATION 
  samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java PRE-CREATION 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala df5992e659302d2918c4e2c30b6122ed51ab9fe8 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala 6f4bfaf916d687426f746fd3b09cd56490bb500e 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 2eec65f02826de40493925c08ff344a8cc4feecb 

Diff: https://reviews.apache.org/r/37817/diff/


Testing
-------

Tested with hello-samza locally. Tested with a sample job on a 3 node Yarn cluster. Works as expected.


Thanks,

Navina Ramesh


Re: Review Request 37817: SAMZA-619 - Modify SamzaAppMaster to enable host-affinity

Posted by Navina Ramesh <nr...@linkedin.com>.

> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/container/LocalityManager.java, line 82
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055358#file1055358line82>
> >
> >     What's the reason to have both hostname and hostip here? Some javadoc would be nice

Actually, when I made the change to write the container-to-host mapping to the coordinator stream, I used the hostIp (in SAMZA-618). When I started working on host-affinity, I realized that yarn returns the container with metinfo about the host. It contains the hostname rather than the IP. 

I think we can only hostName. I doubt if we are using hostIp anywhere else.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java, line 51
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055363#file1055363line51>
> >
> >     Just curious: why are we choosing 3.6s as the sleep time here? Is it from experience?

Not really. I just chose those as defaults. We should be able to configure each of them. I will add doc in the configuration table for this. I am not sure what the right defaults are without experimenting with jobs with varying configs.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java, line 99
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055363#file1055363line99>
> >
> >     Do we need a default value here as well? What's the right behavior if this config variable is not configured? Do we always default to whatever the default Java at the AM machine? Sounded a bit of concern for me.

No. In this case, the null check happens at the caller (See Line 69 in my changes to YarnJob.scala). This was the behavior when this class was in scala and I just translated the same to Java. 

If the option is not configured, it defaults to whatever is defined as JAVA_HOME in the AM machine. Why is this a concern? I think Yarn cluster installation requires a JAVA_HOME to be defined.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java, line 24
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055363#file1055363line24>
> >
> >     nit: would be nice to add javadoc here.

Ok. I have copied the javadoc from the configuration table.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, line 32
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055364#file1055364line32>
> >
> >     nit: javadoc.

I added a brief intro to this class. I have explained the details in the javadocs for the allocator threads.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, line 37
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055364#file1055364line37>
> >
> >     For better code re-use and readability, I think that it might be worth thinking of creating two derived classes, instead of using a boolean flag in this class?

Hmm.. That was my initial thought. Let me try to make the classes derived and see if it looks better.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, line 52
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055364#file1055364line52>
> >
> >     It would be good to document the overall threading model here. It seems that the following are the main threads involved (correct me if I am wrong):
> >     - The main thread that drive the AM to send out container requests to RM
> >     - The allocator thread here to assign the allocated containers to pending requests
> >     - The callback handler thread that receives the responses from RM and populate the allocated containers collection in containerRequestState
> >     - And the SamzaTaskManager handler thread that handles container failures and re-request the containers from RM
> >     
> >     It would be nice to document the above a little bit here s.t. we have a clear picture of which data structure will be shared among which threads.

Yep. You got it right!

I will document the various threads involved in the SamzaTaskManager class. It is going to be a little tough to explain which datastructures are shared among threads. I have already mentioned them in the ContainerRequestState class.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, line 71
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055364#file1055364line71>
> >
> >     Quick question: it seems the code here tries to take the available containers at the beginning of the loop and looping through the pending requests for allocation. What if the callback handler tries to add to the availableContainers at the same time? Is the List<Container> thread safe? I think the code here still works due to the producer/consumer model on the same structure here, as long as the remove/add of the first element in the List<> does not collide w/ each other.

Even if the callback handler thread adds to availableContainers at the same time, it should not cause any problem because adding containers (in addContainer) and removing containers from the list are both within synchronized blocks. 
Now, it could happen that after we read an empty list of allocated containers on the host, the callback thread adds a container. Even though, we may not enter the while-loop for assignment, it will be considered once the allocator thread wakes up after ALLOCATOR_SLEEP_TIME ms.

Add & remove are synchronized. I don't think it matters whether we synchronize on read for List<Container>. Let me know if you think it is essential here.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, line 80
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055364#file1055364line80>
> >
> >     Would it be better to wrap this together w/ containerRequestState.updateStateAfterAssignment()? If I understand correctly, this step is to make sure that the request queue in the containerRequestState is in sync w/ the buffered requests in amClient. I think that containerRequestState is also updated when a request is send async via amClient. It seems to me that they would be better paired up in two API functions.

Are you suggesting I move the "removeContainerRequest" line within the synchronized block? We are not exactly trying to validate that the request queue in the containerRequestState is in sync with the buffer in the amClient. It doesn't affect the "State" that we maintain. So. I decided to keep it out of the synchronized block. Either way, I don't think it makes a lot of difference. It might make it harder to unit test, though. :)


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, line 96
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055364#file1055364line96>
> >
> >     At this moment, are we truly running out of containers? Or simply need to wait for more containers from the RM? I think that we are not sure here. Would it better to log info "Waiting %s more seconds for more containers to be allocated"?

Agreed. I will change it!


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, line 138
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055364#file1055364line138>
> >
> >     It seems that we can have a common thread class within the ContainerAllocator class and two different classes implementing non-host-affinity and host-affinity algorithms here. Worth to think of?

Hmm.. Haven't thought about it because I was worried about messing up the request states. It will be easier to have 2 derived classes - one for host-affinity and one for non-host-affinity. The common functionalities can be in the parent class. Let me think about this common thread class implementation.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, line 301
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055364#file1055364line301>
> >
> >     If amClient.addContainerRequest() is in the same sync block w/ containerRequestState.updateRequestState() here, I would think that amClient.removeContainerRequest() should be in the same sync block w/ containerRequestState.updateRequestState in the allocator threads too.

Yeah.. That's a reasonable argument.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, line 314
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055364#file1055364line314>
> >
> >     Just a thought, maybe directly declaring ContainerRequestState.addContainer() to be a synchronized method if it always requires global lock on containerRequestState?

Sure. Making it synchronized should work. However, it will end-up with a some methods being synchronized and some not in ContainerRequestState. That is why I decided to leave the synchronized blocks. 

I don't have a strong opinion on this. I can make it synchornized if you think it makes the code better!


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java, line 34
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055366#file1055366line34>
> >
> >     This is defined redundantly in both ContainerAllocator and here. It would be good to just keep one.

Ok. Makes sense.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java, line 57
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055366#file1055366line57>
> >
> >     I see the opportunity to remove this conditional flag and split this state into two derived classes as well. Thoughts?

Ah.. Ok. For now, I will try to refactor the allocator code with threads :) I will keep this opportunity reserved as a TODO in the code.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java, line 112
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055366#file1055366line112>
> >
> >     So, here to make sure that the request count and the allocated containers count are not changed in the comparison, access of the two variables and update of the two variables need to be atomic. Is it guaranteed by the global lock on the state?
> >     
> >     Or, I guess the worst result from here is that the requestCountOnThisHost has not been updated while the container is allocated. Hence, one allocated container goes to ANY_HOST and missed the chance of being allocated to the preferred host? We may live w/ this if we want to avoid locking here for performance. It would be good to make a note here.

Yeah. The global local on the state object shoud take care of this. I don't think we need to do any kind of locking here. It will lead to double locking issues.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java, line 136
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055366#file1055366line136>
> >
> >     It would good to comment on under which lock this method should be called.

I made this a private method to the class (will be available in the next RB patch) because it is just a helper method. Not intended for any client to directly call it.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/scala/org/apache/samza/job/yarn/ContainerUtil.scala, line 19
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055372#file1055372line19>
> >
> >     Any reason this class is still in scala? If it is new/refactored, would it make more sense to change it to java?

:) At one point, I just got tired of moving things to java :P It is hard to find equivalents of certain scala aspects in java. 

If this class doesn't use any functions as parameters, I will move it to java. Otherwise, I prefer to keep it this way for now.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala, line 174
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055384#file1055384line174>
> >
> >     I thought that the purpose of these tests are authorganal to the change in host-affinity and would need to be kept here?

So, I nuked the SamzaAppMasterTaskManager.scala class. Part of the operations of that class is now in SamzaTaskManager.java and the rest in ContainerAllocator.java. 
I have added unit tests for each of those classes covering all relevant cases from TestSamzaAppMasterTaskManager.scala. I will update this patch with those changes.


- Navina


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37817/#review97059
-----------------------------------------------------------


On Aug. 26, 2015, 10:14 p.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37817/
> -----------------------------------------------------------
> 
> (Updated Aug. 26, 2015, 10:14 p.m.)
> 
> 
> Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-619
>     https://issues.apache.org/jira/browse/SAMZA-619
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> This a major change to how we request and assign containers to resources. It uses a threaded model for request and allocation. More comments in the javadoc. 
> 
> Major changes to note:
> 1. Changed yarn dependency to 2.6
> 2. Moved YarnConfig to java config class
> 2. Removed SamzaAppMasterTaskManager
> 3. SamzaAppState replaces SamzaAppMasterState
> 4. SamzaTaskManager is very similar to SamzaAppMasterTaskManager, except that it delegates onContainerAllocated and requestContainers to the thread in ContainerAllocator 
> 5. Removed state.unclaimedContainers
> 6. Allocator thread sleep time and request timeout is configurable
> 7. host-affinity can be enabled by using the config "yarn.samza.host-affinity.enabled". Host affinity is guaranteed to work with FairScheduler that has continuous scheduling enabled. Details on this config can be found at SAMZA-617 [design](https://issues.apache.org/jira/secure/attachment/12726945/DESIGN-SAMZA-617-2.pdf).
> 
> Pending items:
> 1. Adding unit tests for AppMaster, TaskManager, ContainerAllocator
> 2. Update config documentation 
> 3. Update web-site with info on this feature
> 
> 
> Diffs
> -----
> 
>   build.gradle 3a7fabcd4f4e5c8db950c3a33bba33618c5565b4 
>   gradle/dependency-versions.gradle 36d564b6ca895f042ee4802643e49180f4947b62 
>   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java c567bf4a1747a7a6aff23be74cf25b1a44e57b42 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java 5455881e2a4a9c865137f2708f655641b93c2bfb 
>   samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 7b592740c082225fc217a24188bc874f883db63b 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 61e228b1ff8619da4ad2e11b15a5afb7999c4996 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala c8438bd6ed01a95860867505d84c04c493a5a197 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java PRE-CREATION 
>   samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml ce2145adde09b0e8097a1c711bc296ee20392e63 
>   samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 1f51551893c42bb13d7941c8b0e5594ac7f42585 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/ContainerUtil.scala PRE-CREATION 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala af42c6a6636953a95f79837fe372e0dbd735df70 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala d475c4c566f17f88a04db5fbc84cc2e27eb333d2 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala 03acfe1bbbabf8f54be9f36fdae785476da45135 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala 060538623e4d67b986bc635518e7fe8ebdde9e24 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala f667c83a7fff43a5efdc64cde019b2cf35f38cb9 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala 1743c8611a94fa1c7f7dafd8ff8c713039f3df8c 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 8dd70c9977473e083e463d01b049c40e15b21f4a 
>   samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala 09f4dc32a4b18aeb3accb856c360ea2f95c82673 
>   samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala 7fd51221149ba59e7ea1bc0b51d58726ec17a7d7 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala df5992e659302d2918c4e2c30b6122ed51ab9fe8 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala 6f4bfaf916d687426f746fd3b09cd56490bb500e 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 2eec65f02826de40493925c08ff344a8cc4feecb 
> 
> Diff: https://reviews.apache.org/r/37817/diff/
> 
> 
> Testing
> -------
> 
> Tested with hello-samza locally. Tested with a sample job on a 3 node Yarn cluster. Works as expected.
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 37817: SAMZA-619 - Modify SamzaAppMaster to enable host-affinity

Posted by Navina Ramesh <nr...@linkedin.com>.

> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java, line 112
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055366#file1055366line112>
> >
> >     So, here to make sure that the request count and the allocated containers count are not changed in the comparison, access of the two variables and update of the two variables need to be atomic. Is it guaranteed by the global lock on the state?
> >     
> >     Or, I guess the worst result from here is that the requestCountOnThisHost has not been updated while the container is allocated. Hence, one allocated container goes to ANY_HOST and missed the chance of being allocated to the preferred host? We may live w/ this if we want to avoid locking here for performance. It would be good to make a note here.
> 
> Navina Ramesh wrote:
>     Yeah. The global local on the state object shoud take care of this. I don't think we need to do any kind of locking here. It will lead to double locking issues.

Correction: I meant "global lock", not "global local"


- Navina


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37817/#review97059
-----------------------------------------------------------


On Aug. 26, 2015, 10:14 p.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37817/
> -----------------------------------------------------------
> 
> (Updated Aug. 26, 2015, 10:14 p.m.)
> 
> 
> Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-619
>     https://issues.apache.org/jira/browse/SAMZA-619
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> This a major change to how we request and assign containers to resources. It uses a threaded model for request and allocation. More comments in the javadoc. 
> 
> Major changes to note:
> 1. Changed yarn dependency to 2.6
> 2. Moved YarnConfig to java config class
> 2. Removed SamzaAppMasterTaskManager
> 3. SamzaAppState replaces SamzaAppMasterState
> 4. SamzaTaskManager is very similar to SamzaAppMasterTaskManager, except that it delegates onContainerAllocated and requestContainers to the thread in ContainerAllocator 
> 5. Removed state.unclaimedContainers
> 6. Allocator thread sleep time and request timeout is configurable
> 7. host-affinity can be enabled by using the config "yarn.samza.host-affinity.enabled". Host affinity is guaranteed to work with FairScheduler that has continuous scheduling enabled. Details on this config can be found at SAMZA-617 [design](https://issues.apache.org/jira/secure/attachment/12726945/DESIGN-SAMZA-617-2.pdf).
> 
> Pending items:
> 1. Adding unit tests for AppMaster, TaskManager, ContainerAllocator
> 2. Update config documentation 
> 3. Update web-site with info on this feature
> 
> 
> Diffs
> -----
> 
>   build.gradle 3a7fabcd4f4e5c8db950c3a33bba33618c5565b4 
>   gradle/dependency-versions.gradle 36d564b6ca895f042ee4802643e49180f4947b62 
>   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java c567bf4a1747a7a6aff23be74cf25b1a44e57b42 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java 5455881e2a4a9c865137f2708f655641b93c2bfb 
>   samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 7b592740c082225fc217a24188bc874f883db63b 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 61e228b1ff8619da4ad2e11b15a5afb7999c4996 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala c8438bd6ed01a95860867505d84c04c493a5a197 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java PRE-CREATION 
>   samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml ce2145adde09b0e8097a1c711bc296ee20392e63 
>   samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 1f51551893c42bb13d7941c8b0e5594ac7f42585 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/ContainerUtil.scala PRE-CREATION 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala af42c6a6636953a95f79837fe372e0dbd735df70 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala d475c4c566f17f88a04db5fbc84cc2e27eb333d2 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala 03acfe1bbbabf8f54be9f36fdae785476da45135 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala 060538623e4d67b986bc635518e7fe8ebdde9e24 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala f667c83a7fff43a5efdc64cde019b2cf35f38cb9 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala 1743c8611a94fa1c7f7dafd8ff8c713039f3df8c 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 8dd70c9977473e083e463d01b049c40e15b21f4a 
>   samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala 09f4dc32a4b18aeb3accb856c360ea2f95c82673 
>   samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala 7fd51221149ba59e7ea1bc0b51d58726ec17a7d7 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala df5992e659302d2918c4e2c30b6122ed51ab9fe8 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala 6f4bfaf916d687426f746fd3b09cd56490bb500e 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 2eec65f02826de40493925c08ff344a8cc4feecb 
> 
> Diff: https://reviews.apache.org/r/37817/diff/
> 
> 
> Testing
> -------
> 
> Tested with hello-samza locally. Tested with a sample job on a 3 node Yarn cluster. Works as expected.
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 37817: SAMZA-619 - Modify SamzaAppMaster to enable host-affinity

Posted by "Yi Pan (Data Infrastructure)" <yi...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37817/#review97059
-----------------------------------------------------------


Overall looks good to me. I have a few documentation/code reorganization suggestions. Thanks a lot!


samza-core/src/main/java/org/apache/samza/container/LocalityManager.java (line 82)
<https://reviews.apache.org/r/37817/#comment152777>

    What's the reason to have both hostname and hostip here? Some javadoc would be nice



samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java (line 24)
<https://reviews.apache.org/r/37817/#comment152820>

    nit: would be nice to add javadoc here.



samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java (line 51)
<https://reviews.apache.org/r/37817/#comment152821>

    Just curious: why are we choosing 3.6s as the sleep time here? Is it from experience?



samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java (line 99)
<https://reviews.apache.org/r/37817/#comment152822>

    Do we need a default value here as well? What's the right behavior if this config variable is not configured? Do we always default to whatever the default Java at the AM machine? Sounded a bit of concern for me.



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java (line 32)
<https://reviews.apache.org/r/37817/#comment152823>

    nit: javadoc.



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java (line 37)
<https://reviews.apache.org/r/37817/#comment152824>

    For better code re-use and readability, I think that it might be worth thinking of creating two derived classes, instead of using a boolean flag in this class?



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java (line 52)
<https://reviews.apache.org/r/37817/#comment152829>

    It would be good to document the overall threading model here. It seems that the following are the main threads involved (correct me if I am wrong):
    - The main thread that drive the AM to send out container requests to RM
    - The allocator thread here to assign the allocated containers to pending requests
    - The callback handler thread that receives the responses from RM and populate the allocated containers collection in containerRequestState
    - And the SamzaTaskManager handler thread that handles container failures and re-request the containers from RM
    
    It would be nice to document the above a little bit here s.t. we have a clear picture of which data structure will be shared among which threads.



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java (line 71)
<https://reviews.apache.org/r/37817/#comment152826>

    Quick question: it seems the code here tries to take the available containers at the beginning of the loop and looping through the pending requests for allocation. What if the callback handler tries to add to the availableContainers at the same time? Is the List<Container> thread safe? I think the code here still works due to the producer/consumer model on the same structure here, as long as the remove/add of the first element in the List<> does not collide w/ each other.



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java (line 80)
<https://reviews.apache.org/r/37817/#comment152825>

    Would it be better to wrap this together w/ containerRequestState.updateStateAfterAssignment()? If I understand correctly, this step is to make sure that the request queue in the containerRequestState is in sync w/ the buffered requests in amClient. I think that containerRequestState is also updated when a request is send async via amClient. It seems to me that they would be better paired up in two API functions.



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java (line 96)
<https://reviews.apache.org/r/37817/#comment152827>

    At this moment, are we truly running out of containers? Or simply need to wait for more containers from the RM? I think that we are not sure here. Would it better to log info "Waiting %s more seconds for more containers to be allocated"?



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java (line 138)
<https://reviews.apache.org/r/37817/#comment152828>

    It seems that we can have a common thread class within the ContainerAllocator class and two different classes implementing non-host-affinity and host-affinity algorithms here. Worth to think of?



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java (line 158)
<https://reviews.apache.org/r/37817/#comment152858>

    Similar comments here as in non-host-affinity allocator



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java (line 301)
<https://reviews.apache.org/r/37817/#comment152876>

    If amClient.addContainerRequest() is in the same sync block w/ containerRequestState.updateRequestState() here, I would think that amClient.removeContainerRequest() should be in the same sync block w/ containerRequestState.updateRequestState in the allocator threads too.



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java (line 314)
<https://reviews.apache.org/r/37817/#comment152891>

    Just a thought, maybe directly declaring ContainerRequestState.addContainer() to be a synchronized method if it always requires global lock on containerRequestState?



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (line 34)
<https://reviews.apache.org/r/37817/#comment152879>

    This is defined redundantly in both ContainerAllocator and here. It would be good to just keep one.



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (line 57)
<https://reviews.apache.org/r/37817/#comment152882>

    I see the opportunity to remove this conditional flag and split this state into two derived classes as well. Thoughts?



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (line 68)
<https://reviews.apache.org/r/37817/#comment152884>

    nit: it would be good to mention that under which lock the following operation is performed as well.



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (line 94)
<https://reviews.apache.org/r/37817/#comment152886>

    nit: Under which lock the method is called? It would be better for readability to add this information in the javadoc



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (line 112)
<https://reviews.apache.org/r/37817/#comment152889>

    So, here to make sure that the request count and the allocated containers count are not changed in the comparison, access of the two variables and update of the two variables need to be atomic. Is it guaranteed by the global lock on the state?
    
    Or, I guess the worst result from here is that the requestCountOnThisHost has not been updated while the container is allocated. Hence, one allocated container goes to ANY_HOST and missed the chance of being allocated to the preferred host? We may live w/ this if we want to avoid locking here for performance. It would be good to make a note here.



samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (line 136)
<https://reviews.apache.org/r/37817/#comment152892>

    It would good to comment on under which lock this method should be called.



samza-yarn/src/main/scala/org/apache/samza/job/yarn/ContainerUtil.scala (line 19)
<https://reviews.apache.org/r/37817/#comment152901>

    Any reason this class is still in scala? If it is new/refactored, would it make more sense to change it to java?



samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala (line 173)
<https://reviews.apache.org/r/37817/#comment152902>

    I thought that the purpose of these tests are authorganal to the change in host-affinity and would need to be kept here?


- Yi Pan (Data Infrastructure)


On Aug. 26, 2015, 10:14 p.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37817/
> -----------------------------------------------------------
> 
> (Updated Aug. 26, 2015, 10:14 p.m.)
> 
> 
> Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-619
>     https://issues.apache.org/jira/browse/SAMZA-619
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> This a major change to how we request and assign containers to resources. It uses a threaded model for request and allocation. More comments in the javadoc. 
> 
> Major changes to note:
> 1. Changed yarn dependency to 2.6
> 2. Moved YarnConfig to java config class
> 2. Removed SamzaAppMasterTaskManager
> 3. SamzaAppState replaces SamzaAppMasterState
> 4. SamzaTaskManager is very similar to SamzaAppMasterTaskManager, except that it delegates onContainerAllocated and requestContainers to the thread in ContainerAllocator 
> 5. Removed state.unclaimedContainers
> 6. Allocator thread sleep time and request timeout is configurable
> 7. host-affinity can be enabled by using the config "yarn.samza.host-affinity.enabled". Host affinity is guaranteed to work with FairScheduler that has continuous scheduling enabled. Details on this config can be found at SAMZA-617 [design](https://issues.apache.org/jira/secure/attachment/12726945/DESIGN-SAMZA-617-2.pdf).
> 
> Pending items:
> 1. Adding unit tests for AppMaster, TaskManager, ContainerAllocator
> 2. Update config documentation 
> 3. Update web-site with info on this feature
> 
> 
> Diffs
> -----
> 
>   build.gradle 3a7fabcd4f4e5c8db950c3a33bba33618c5565b4 
>   gradle/dependency-versions.gradle 36d564b6ca895f042ee4802643e49180f4947b62 
>   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java c567bf4a1747a7a6aff23be74cf25b1a44e57b42 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java 5455881e2a4a9c865137f2708f655641b93c2bfb 
>   samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 7b592740c082225fc217a24188bc874f883db63b 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 61e228b1ff8619da4ad2e11b15a5afb7999c4996 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala c8438bd6ed01a95860867505d84c04c493a5a197 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java PRE-CREATION 
>   samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml ce2145adde09b0e8097a1c711bc296ee20392e63 
>   samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 1f51551893c42bb13d7941c8b0e5594ac7f42585 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/ContainerUtil.scala PRE-CREATION 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala af42c6a6636953a95f79837fe372e0dbd735df70 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala d475c4c566f17f88a04db5fbc84cc2e27eb333d2 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala 03acfe1bbbabf8f54be9f36fdae785476da45135 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala 060538623e4d67b986bc635518e7fe8ebdde9e24 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala f667c83a7fff43a5efdc64cde019b2cf35f38cb9 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala 1743c8611a94fa1c7f7dafd8ff8c713039f3df8c 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 8dd70c9977473e083e463d01b049c40e15b21f4a 
>   samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala 09f4dc32a4b18aeb3accb856c360ea2f95c82673 
>   samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala 7fd51221149ba59e7ea1bc0b51d58726ec17a7d7 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala df5992e659302d2918c4e2c30b6122ed51ab9fe8 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala 6f4bfaf916d687426f746fd3b09cd56490bb500e 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 2eec65f02826de40493925c08ff344a8cc4feecb 
> 
> Diff: https://reviews.apache.org/r/37817/diff/
> 
> 
> Testing
> -------
> 
> Tested with hello-samza locally. Tested with a sample job on a 3 node Yarn cluster. Works as expected.
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 37817: SAMZA-619 - Modify SamzaAppMaster to enable host-affinity

Posted by Navina Ramesh <nr...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37817/
-----------------------------------------------------------

(Updated Aug. 26, 2015, 10:14 p.m.)


Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and Yi Pan (Data Infrastructure).


Bugs: SAMZA-619
    https://issues.apache.org/jira/browse/SAMZA-619


Repository: samza


Description (updated)
-------

This a major change to how we request and assign containers to resources. It uses a threaded model for request and allocation. More comments in the javadoc. 

Major changes to note:
1. Changed yarn dependency to 2.6
2. Moved YarnConfig to java config class
2. Removed SamzaAppMasterTaskManager
3. SamzaAppState replaces SamzaAppMasterState
4. SamzaTaskManager is very similar to SamzaAppMasterTaskManager, except that it delegates onContainerAllocated and requestContainers to the thread in ContainerAllocator 
5. Removed state.unclaimedContainers
6. Allocator thread sleep time and request timeout is configurable
7. host-affinity can be enabled by using the config "yarn.samza.host-affinity.enabled". Host affinity is guaranteed to work with FairScheduler that has continuous scheduling enabled. Details on this config can be found at SAMZA-617 [design](https://issues.apache.org/jira/secure/attachment/12726945/DESIGN-SAMZA-617-2.pdf).

Pending items:
1. Adding unit tests for AppMaster, TaskManager, ContainerAllocator
2. Update config documentation 
3. Update web-site with info on this feature


Diffs
-----

  build.gradle 3a7fabcd4f4e5c8db950c3a33bba33618c5565b4 
  gradle/dependency-versions.gradle 36d564b6ca895f042ee4802643e49180f4947b62 
  samza-core/src/main/java/org/apache/samza/container/LocalityManager.java c567bf4a1747a7a6aff23be74cf25b1a44e57b42 
  samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java 5455881e2a4a9c865137f2708f655641b93c2bfb 
  samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 7b592740c082225fc217a24188bc874f883db63b 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 61e228b1ff8619da4ad2e11b15a5afb7999c4996 
  samza-core/src/main/scala/org/apache/samza/util/Util.scala c8438bd6ed01a95860867505d84c04c493a5a197 
  samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java PRE-CREATION 
  samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml ce2145adde09b0e8097a1c711bc296ee20392e63 
  samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 1f51551893c42bb13d7941c8b0e5594ac7f42585 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/ContainerUtil.scala PRE-CREATION 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala af42c6a6636953a95f79837fe372e0dbd735df70 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala d475c4c566f17f88a04db5fbc84cc2e27eb333d2 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala 03acfe1bbbabf8f54be9f36fdae785476da45135 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala 060538623e4d67b986bc635518e7fe8ebdde9e24 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala f667c83a7fff43a5efdc64cde019b2cf35f38cb9 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala 1743c8611a94fa1c7f7dafd8ff8c713039f3df8c 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 8dd70c9977473e083e463d01b049c40e15b21f4a 
  samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala 09f4dc32a4b18aeb3accb856c360ea2f95c82673 
  samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala 7fd51221149ba59e7ea1bc0b51d58726ec17a7d7 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala df5992e659302d2918c4e2c30b6122ed51ab9fe8 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala 6f4bfaf916d687426f746fd3b09cd56490bb500e 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 2eec65f02826de40493925c08ff344a8cc4feecb 

Diff: https://reviews.apache.org/r/37817/diff/


Testing
-------

Tested with hello-samza locally. Tested with a sample job on a 3 node Yarn cluster. Works as expected.


Thanks,

Navina Ramesh