You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Navina Ramesh <nr...@linkedin.com> on 2015/09/01 01:15:23 UTC

Re: Review Request 37817: SAMZA-619 - Modify SamzaAppMaster to enable host-affinity


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/container/LocalityManager.java, line 82
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055358#file1055358line82>
> >
> >     What's the reason to have both hostname and hostip here? Some javadoc would be nice

Actually, when I made the change to write the container-to-host mapping to the coordinator stream, I used the hostIp (in SAMZA-618). When I started working on host-affinity, I realized that yarn returns the container with metinfo about the host. It contains the hostname rather than the IP. 

I think we can only hostName. I doubt if we are using hostIp anywhere else.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java, line 51
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055363#file1055363line51>
> >
> >     Just curious: why are we choosing 3.6s as the sleep time here? Is it from experience?

Not really. I just chose those as defaults. We should be able to configure each of them. I will add doc in the configuration table for this. I am not sure what the right defaults are without experimenting with jobs with varying configs.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java, line 99
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055363#file1055363line99>
> >
> >     Do we need a default value here as well? What's the right behavior if this config variable is not configured? Do we always default to whatever the default Java at the AM machine? Sounded a bit of concern for me.

No. In this case, the null check happens at the caller (See Line 69 in my changes to YarnJob.scala). This was the behavior when this class was in scala and I just translated the same to Java. 

If the option is not configured, it defaults to whatever is defined as JAVA_HOME in the AM machine. Why is this a concern? I think Yarn cluster installation requires a JAVA_HOME to be defined.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java, line 24
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055363#file1055363line24>
> >
> >     nit: would be nice to add javadoc here.

Ok. I have copied the javadoc from the configuration table.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, line 32
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055364#file1055364line32>
> >
> >     nit: javadoc.

I added a brief intro to this class. I have explained the details in the javadocs for the allocator threads.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, line 37
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055364#file1055364line37>
> >
> >     For better code re-use and readability, I think that it might be worth thinking of creating two derived classes, instead of using a boolean flag in this class?

Hmm.. That was my initial thought. Let me try to make the classes derived and see if it looks better.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, line 52
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055364#file1055364line52>
> >
> >     It would be good to document the overall threading model here. It seems that the following are the main threads involved (correct me if I am wrong):
> >     - The main thread that drive the AM to send out container requests to RM
> >     - The allocator thread here to assign the allocated containers to pending requests
> >     - The callback handler thread that receives the responses from RM and populate the allocated containers collection in containerRequestState
> >     - And the SamzaTaskManager handler thread that handles container failures and re-request the containers from RM
> >     
> >     It would be nice to document the above a little bit here s.t. we have a clear picture of which data structure will be shared among which threads.

Yep. You got it right!

I will document the various threads involved in the SamzaTaskManager class. It is going to be a little tough to explain which datastructures are shared among threads. I have already mentioned them in the ContainerRequestState class.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, line 71
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055364#file1055364line71>
> >
> >     Quick question: it seems the code here tries to take the available containers at the beginning of the loop and looping through the pending requests for allocation. What if the callback handler tries to add to the availableContainers at the same time? Is the List<Container> thread safe? I think the code here still works due to the producer/consumer model on the same structure here, as long as the remove/add of the first element in the List<> does not collide w/ each other.

Even if the callback handler thread adds to availableContainers at the same time, it should not cause any problem because adding containers (in addContainer) and removing containers from the list are both within synchronized blocks. 
Now, it could happen that after we read an empty list of allocated containers on the host, the callback thread adds a container. Even though, we may not enter the while-loop for assignment, it will be considered once the allocator thread wakes up after ALLOCATOR_SLEEP_TIME ms.

Add & remove are synchronized. I don't think it matters whether we synchronize on read for List<Container>. Let me know if you think it is essential here.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, line 80
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055364#file1055364line80>
> >
> >     Would it be better to wrap this together w/ containerRequestState.updateStateAfterAssignment()? If I understand correctly, this step is to make sure that the request queue in the containerRequestState is in sync w/ the buffered requests in amClient. I think that containerRequestState is also updated when a request is send async via amClient. It seems to me that they would be better paired up in two API functions.

Are you suggesting I move the "removeContainerRequest" line within the synchronized block? We are not exactly trying to validate that the request queue in the containerRequestState is in sync with the buffer in the amClient. It doesn't affect the "State" that we maintain. So. I decided to keep it out of the synchronized block. Either way, I don't think it makes a lot of difference. It might make it harder to unit test, though. :)


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, line 96
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055364#file1055364line96>
> >
> >     At this moment, are we truly running out of containers? Or simply need to wait for more containers from the RM? I think that we are not sure here. Would it better to log info "Waiting %s more seconds for more containers to be allocated"?

Agreed. I will change it!


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, line 138
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055364#file1055364line138>
> >
> >     It seems that we can have a common thread class within the ContainerAllocator class and two different classes implementing non-host-affinity and host-affinity algorithms here. Worth to think of?

Hmm.. Haven't thought about it because I was worried about messing up the request states. It will be easier to have 2 derived classes - one for host-affinity and one for non-host-affinity. The common functionalities can be in the parent class. Let me think about this common thread class implementation.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, line 301
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055364#file1055364line301>
> >
> >     If amClient.addContainerRequest() is in the same sync block w/ containerRequestState.updateRequestState() here, I would think that amClient.removeContainerRequest() should be in the same sync block w/ containerRequestState.updateRequestState in the allocator threads too.

Yeah.. That's a reasonable argument.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, line 314
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055364#file1055364line314>
> >
> >     Just a thought, maybe directly declaring ContainerRequestState.addContainer() to be a synchronized method if it always requires global lock on containerRequestState?

Sure. Making it synchronized should work. However, it will end-up with a some methods being synchronized and some not in ContainerRequestState. That is why I decided to leave the synchronized blocks. 

I don't have a strong opinion on this. I can make it synchornized if you think it makes the code better!


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java, line 34
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055366#file1055366line34>
> >
> >     This is defined redundantly in both ContainerAllocator and here. It would be good to just keep one.

Ok. Makes sense.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java, line 57
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055366#file1055366line57>
> >
> >     I see the opportunity to remove this conditional flag and split this state into two derived classes as well. Thoughts?

Ah.. Ok. For now, I will try to refactor the allocator code with threads :) I will keep this opportunity reserved as a TODO in the code.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java, line 112
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055366#file1055366line112>
> >
> >     So, here to make sure that the request count and the allocated containers count are not changed in the comparison, access of the two variables and update of the two variables need to be atomic. Is it guaranteed by the global lock on the state?
> >     
> >     Or, I guess the worst result from here is that the requestCountOnThisHost has not been updated while the container is allocated. Hence, one allocated container goes to ANY_HOST and missed the chance of being allocated to the preferred host? We may live w/ this if we want to avoid locking here for performance. It would be good to make a note here.

Yeah. The global local on the state object shoud take care of this. I don't think we need to do any kind of locking here. It will lead to double locking issues.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java, line 136
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055366#file1055366line136>
> >
> >     It would good to comment on under which lock this method should be called.

I made this a private method to the class (will be available in the next RB patch) because it is just a helper method. Not intended for any client to directly call it.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/scala/org/apache/samza/job/yarn/ContainerUtil.scala, line 19
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055372#file1055372line19>
> >
> >     Any reason this class is still in scala? If it is new/refactored, would it make more sense to change it to java?

:) At one point, I just got tired of moving things to java :P It is hard to find equivalents of certain scala aspects in java. 

If this class doesn't use any functions as parameters, I will move it to java. Otherwise, I prefer to keep it this way for now.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala, line 174
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055384#file1055384line174>
> >
> >     I thought that the purpose of these tests are authorganal to the change in host-affinity and would need to be kept here?

So, I nuked the SamzaAppMasterTaskManager.scala class. Part of the operations of that class is now in SamzaTaskManager.java and the rest in ContainerAllocator.java. 
I have added unit tests for each of those classes covering all relevant cases from TestSamzaAppMasterTaskManager.scala. I will update this patch with those changes.


- Navina


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37817/#review97059
-----------------------------------------------------------


On Aug. 26, 2015, 10:14 p.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37817/
> -----------------------------------------------------------
> 
> (Updated Aug. 26, 2015, 10:14 p.m.)
> 
> 
> Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-619
>     https://issues.apache.org/jira/browse/SAMZA-619
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> This a major change to how we request and assign containers to resources. It uses a threaded model for request and allocation. More comments in the javadoc. 
> 
> Major changes to note:
> 1. Changed yarn dependency to 2.6
> 2. Moved YarnConfig to java config class
> 2. Removed SamzaAppMasterTaskManager
> 3. SamzaAppState replaces SamzaAppMasterState
> 4. SamzaTaskManager is very similar to SamzaAppMasterTaskManager, except that it delegates onContainerAllocated and requestContainers to the thread in ContainerAllocator 
> 5. Removed state.unclaimedContainers
> 6. Allocator thread sleep time and request timeout is configurable
> 7. host-affinity can be enabled by using the config "yarn.samza.host-affinity.enabled". Host affinity is guaranteed to work with FairScheduler that has continuous scheduling enabled. Details on this config can be found at SAMZA-617 [design](https://issues.apache.org/jira/secure/attachment/12726945/DESIGN-SAMZA-617-2.pdf).
> 
> Pending items:
> 1. Adding unit tests for AppMaster, TaskManager, ContainerAllocator
> 2. Update config documentation 
> 3. Update web-site with info on this feature
> 
> 
> Diffs
> -----
> 
>   build.gradle 3a7fabcd4f4e5c8db950c3a33bba33618c5565b4 
>   gradle/dependency-versions.gradle 36d564b6ca895f042ee4802643e49180f4947b62 
>   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java c567bf4a1747a7a6aff23be74cf25b1a44e57b42 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java 5455881e2a4a9c865137f2708f655641b93c2bfb 
>   samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 7b592740c082225fc217a24188bc874f883db63b 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 61e228b1ff8619da4ad2e11b15a5afb7999c4996 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala c8438bd6ed01a95860867505d84c04c493a5a197 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java PRE-CREATION 
>   samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml ce2145adde09b0e8097a1c711bc296ee20392e63 
>   samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 1f51551893c42bb13d7941c8b0e5594ac7f42585 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/ContainerUtil.scala PRE-CREATION 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala af42c6a6636953a95f79837fe372e0dbd735df70 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala d475c4c566f17f88a04db5fbc84cc2e27eb333d2 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala 03acfe1bbbabf8f54be9f36fdae785476da45135 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala 060538623e4d67b986bc635518e7fe8ebdde9e24 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala f667c83a7fff43a5efdc64cde019b2cf35f38cb9 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala 1743c8611a94fa1c7f7dafd8ff8c713039f3df8c 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 8dd70c9977473e083e463d01b049c40e15b21f4a 
>   samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala 09f4dc32a4b18aeb3accb856c360ea2f95c82673 
>   samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala 7fd51221149ba59e7ea1bc0b51d58726ec17a7d7 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala df5992e659302d2918c4e2c30b6122ed51ab9fe8 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala 6f4bfaf916d687426f746fd3b09cd56490bb500e 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 2eec65f02826de40493925c08ff344a8cc4feecb 
> 
> Diff: https://reviews.apache.org/r/37817/diff/
> 
> 
> Testing
> -------
> 
> Tested with hello-samza locally. Tested with a sample job on a 3 node Yarn cluster. Works as expected.
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>