You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Xiang Fu <fx...@gmail.com> on 2016/08/19 19:20:36 UTC

Review Request 51250: Support container restart when partition count diff happens.

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51250/
-----------------------------------------------------------

Review request for samza.


Repository: samza


Description
-------

Support container restart when partition count diff happens.


Diffs
-----

  samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java 8652465ca6764bc69a2767bc3a4f436ca9f8a401 
  samza-core/src/main/java/org/apache/samza/job/model/JobModel.java dbd6dcc41644e9ea1e6a12dcd18f44ef2e63bc72 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ba38b5cfa4e61b5513ce38dd2be32438b62cd7ce 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java 96d3d7cc2853356a338dc25067f01440c938e216 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java dacc52de0a34498a715a299bc69c95aebd1b08ba 

Diff: https://reviews.apache.org/r/51250/diff/


Testing
-------


Thanks,

Xiang Fu


Re: Review Request 51250: Support container restart when partition count diff happens.

Posted by "Yi Pan (Data Infrastructure)" <yi...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51250/#review146427
-----------------------------------------------------------



@Xiang, thanks for put up this long-wanted improvement! I mainly have one high-level comment: when updating the job models, we need to make sure that containers w/ old job model does not double-processing the same partition w/ containers w/ new job model. Thanks!


samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java (line 21)
<https://reviews.apache.org/r/51250/#comment212856>

    nit: generally, we would try to avoid import w/ "*". Any reason that you use a "*" here?



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala (line 125)
<https://reviews.apache.org/r/51250/#comment212865>

    This section of code is exactly the same copy from 82-93. Would be better to make it a common function instead of copying the code.



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala (line 148)
<https://reviews.apache.org/r/51250/#comment213051>

    Why not passing in the oldJobModel's changelog partition mapping? Ideally, from line 139, the code should be almost the same as in initializeJobModel(). It would be better to use a common function 
    
    createJobModel(config: Config, 
      changelogManager: ChangelogPartitionManager,
      localityManager: LocalityManager,
      streamMetadataCache: StreamMetadataCache,
      previousChangelogPartition: util.HashMap[TaskName, Integer])
      
    to avoid copying the code.



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala (line 165)
<https://reviews.apache.org/r/51250/#comment212863>

    What's the reason to *always* set to oldest here? This would change the default behavior the user may config in the old config. I am curious what's the rationale behind it?



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala (line 284)
<https://reviews.apache.org/r/51250/#comment213053>

    When the input partition changes, which may cause the number of tasks change, the changelog partition map may change as well. I think that we should still update the changelog partition mapping.



samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java (line 488)
<https://reviews.apache.org/r/51250/#comment213077>

    The concern here is:
    
    1. when stopContainer() fails after all 3 retries, the old container may still be running. Hence, it will leave the job running in a mixed mode: some containers running w/ old job model and some containers running w/ new job model, which is bad and could lead to double processing or missing a partition.
    
    The safer solution seems to be:
    1. throw exception if stopContainer() fails after all re-tries.
    2. do not start new containers until all old containers are killed
    
    There are multiple ways to implement 2:
    a) lock the container allocator in JobCoordinator until all old containers are killed. It is easier, but downside is that the job will have longer pause time.
    b) allow the containers to start in a "paused" mode on JobModel change, until all old containers are killed by the JobCoordinator. We have to implement a push mechanism from the JobCoordinator to notify all containers to "resume operation". It is more complex, but enables finer granular control s.t. some new containers can start earlier when a subset of old containers are killed (i.e. all new containers processing partitions that are not processed by old containers can start immediately).
    
    I would recommend to go w/ option a) first, and gradually moving toward b)


- Yi Pan (Data Infrastructure)


On Aug. 19, 2016, 7:20 p.m., Xiang Fu wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51250/
> -----------------------------------------------------------
> 
> (Updated Aug. 19, 2016, 7:20 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Support container restart when partition count diff happens.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java 8652465ca6764bc69a2767bc3a4f436ca9f8a401 
>   samza-core/src/main/java/org/apache/samza/job/model/JobModel.java dbd6dcc41644e9ea1e6a12dcd18f44ef2e63bc72 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ba38b5cfa4e61b5513ce38dd2be32438b62cd7ce 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java 96d3d7cc2853356a338dc25067f01440c938e216 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java dacc52de0a34498a715a299bc69c95aebd1b08ba 
> 
> Diff: https://reviews.apache.org/r/51250/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Xiang Fu
> 
>