You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Yi Pan <ni...@gmail.com> on 2018/02/02 01:27:59 UTC

Re: [DISCUSS] SEP: Host affinity in standalone.

Hi, Santhoosh,

Thanks for the SEP and the latest revisions. Here are some of my comments
based on the latest proposal:

- The basic idea for implementing state-aware task-to-physical-process
assignment in JobModel is not quite clear. ContainerAllocator is solving a
different problem in host-affinity: trying to put the physical process to
the same physical host/resource, while the JobModel is performing the
grouping of tasks to container processes. In YARN, we need to solve both
affinity problems in code: ContainerAllocator to put the container to the
specific physical host, and the TaskNameGrouper with balance() method to
minimize the task movement, assuming ContainerAllocator is successful. In
standalone, we only need to solve the assignment of tasks to physical host.
So, I think the main mismatch is:

   a) Standalone does not need/rely on ContainerAllocator to associate
container with physical host. It is already done and reported by the
containers in a job

   b) The current TaskNameGrouper with balance() method only considers the
existing task-to-container mapping, which assumes another layer of
container-to-physical-resource is done somewhere else. Hence, it would be
nicer to unify the balance() method to directly consider existing
task-to-physical-resource mapping, which satisfy both YARN and standalone
deployment


Having said that, I think that we will need the following in JobModel
generator:

1) ContainerInfo: (containerId, physicalResourceId)

    * this mapping is the reported mapping from container processes in
standalone, and preferred mapping in YARN

2) TaskLocality: (taskId, physicalResourceId)

    * this mapping is actually reported task location from container
processes in both standalone and YARN

Then, the TaskNameGrouper.group() method is trying the best to group the
tasks to containerIds w/ the same physicalResourceId in a balanced pattern,
in both YARN and standalone.

In YARN, the JobModel will be used by ContainerAllocator to achieve the
preferred container-to-physical-resource mapping, while in standalone, the
JobModel will be directly used by container processes to start processing.


- not quite clear to me that what’s the distributed barrier is used for in
the graph. For every container process to pick up a certain version of
JobModel? Who is waiting on the barrier? The leader or the followers? Or
everyone?


- Additional question: if the input topic partitions change, or the SSP
grouper changes, the task-to-ssp mapping changes and the task locality may
not make sense at all. Is this considered out-of-scope for the current
design? Or we should at least add an version number for task-to-ssp map and
associate it with the task locality info?


- Should the leader validate that everyone has picked up the new version of
JobModel and reported the correct task-locality expected in the JobModel,
after step 10 in the graph?


- Why are we missing a processor-to-locationId mapping in the zknode data
model? Is it a value written to processor.00000N node? Also, why don’t we
write locationId as a value to task0N znode, instead of a child node? And
which znode is the distributed barrier that you used in the graph?


- In State store restoration: “nonexistent local stores will be restored…”
is not clear to me. What do you mean by “nonexistent” here?


- So, based on the description in “Container to physical host assignment”,
you will need to know the full two-level mapping in locality info:
task-to-container-to-physical-host. If that’s the case, how do you keep the
task-to-container mapping in your current znode structure which only has
task0N as children znodes? Please make it clear.


- For Semantics of host affinity with run.id, “all stream processors of a
samza application stops” = one run is not true for YARN deployed Samza
applications. And the description in that section is very confusing. As we
discussed last time, we should define what’s a continuous application run
by definitions on the continuation on the local states, checkpoints, and
intermediate message streams. Within the same run.id, the continuation of
the states/checkpoints/intermediate message streams is guaranteed, while
different run.id would mean a disruptive change to the application and
throwing away all partial states. The actual conditions to determine a
continuation of states in an Samza application in different deployment
environment and different input sources may be different. But the
consistent semantic meaning of “run.id” should be the continuation of
states. With that in mind, the only relevant point is probably that
host-affinity only makes sense within the same run.id.


- what’s the definition of LocationId? An interface? An abstract class?
What’s the specific sub-class for LocationId in RAIN vs in YARN? And
potentially, in other environment like Kubernetes?


- It seems like we can deprecate the whole BalancingTaskNameGrouper
altogether.


- So, it seems that the LocalityManager has no change from the interface
methods. I.e. it still only maintains the container-to-physical-resource
mapping as it does today. That also means that you will somehow store the
task-to-container mapping info in the locality znode as well. It would be
nice to make it clear how the task-to-container-to-physical-resource
mapping is stored and read in ZK.


- Also needs to include compatibility test after deprecating/changing
TaskNameGrouper API, make sure the default behavior of default groupers is
the same.


- In Compatibility section, remove the future versions (i.e. 0.15) since we
are not sure when this change can be completed and released yet. It seems
that we are not changing the persistent data format for locality info in
coordinator stream. Make it explicit.


- If you are making LocalityManager class an interface, are you planning to
make it pluggable as well? Actually, I was thinking that the model we
wanted to go is that making the metadata store for locality info an
interface and pluggable, while keep the LocalityManager as a common
implementation.


- Lastly, from the grouper API definition, you will not be able to get the
physical location info, if it is not passed in via
currentGenerationProcessorIds or ContainerModel. How are you going to
resolve that w/o creating a LocalityManager in Grouper implementation
class? I would strongly recommend no to create an instance of
LocalityManager in the Grouper implementation class.

On Wed, Jan 10, 2018 at 8:00 PM, santhosh venkat <
santhoshvenkat1988@gmail.com> wrote:

> Hi,
>
>
> I created SEP for SAMZA-1554
> <https://issues.apache.org/jira/browse/SAMZA-1554>: Host affinity in
> standalone.
>
>
> The link to the SEP is here:
>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75957309
>
>
> Please review and comments are welcome.
>
>
> Thanks.
>

Re: [DISCUSS] SEP: Host affinity in standalone.

Posted by santhosh venkat <sa...@gmail.com>.
Boris,

Thanks for your review. Responses are inline.

>> I think we also need MetadataStorage one (details may be worked out
later) to hide the locality storage implementation details.

          - Agree, my initial proposal had LocalityManager interface which
was logically similar to MetaDataStore. Changed the interface name to
MetadataStore and updated it everywhere in the proposal.


   >>  Instead of using physical hostname we should stick to the
LocationId, since some VMs may be running multiple processors on a single
physical host.

          - Agree. This is taken into account and we have a pluggable
abstraction to generate it for different execution environments.


   >> I think, though, using function names doesn't give enough clarity on
what is going on. May be we should add more explanation.

          - Updated the proposal based upon this comment.



   >> First diagram describes how local storage works. Please label it as
such.

          - Updated the proposal to address this comment.


   >> Some time the perfect mapping to the same Locality is not
possible(especially when a task dies and is distributed between other
tasks).

          - Yes, this is worst case scenario where the task will be
assigned to any processor when there’re no live processors registered from
it’s preferred host.

Thanks.

Re: [DISCUSS] SEP: Host affinity in standalone.

Posted by santhosh venkat <sa...@gmail.com>.
Yi,

Thanks for taking time to review this and providing your feedback.
Responses inline.

>> 1) ContainerInfo: (containerId, physicalResourceId): this mapping is the
reported mapping from container processes in standalone, and preferred
mapping in YARN

>> 2) TaskLocality: (taskId, physicalResourceId):this mapping is actually
reported task location from container processes in both standalone and
YARN.

Yes, totally agree with above points. I think the group interface contract
already reflects that. locationId registered by live processors and task
locality of previous generation will be used to calculate the assignment of
current generation in standalone. Preferred host mapping will be used for
task and processor locality in case of yarn. Any new task/processor for
which grouping in unknown(unavailable in preferred host/task-locality in
underlying storage layer), will be treated as any_host during assignment.

I don't think it's a good idea to unify the locality storage formats
between yarn and standalone as a part of this change(which will require a
elaborate migration plan and extensive testing). I think it's fair to
consider it's out of scope for this proposal.

>>  Should the leader validate that everyone has picked up the new version
of JobModel and reported the correct task-locality expected in the
JobModel, after step 10 in the graph?
 Though it's a extra precaution taken by leader for ensuring correctness, i
think it might be a premature optimization. Even in existing setup in yarn,
we don't have corresponding validations by ApplicationMaster after
generations. I think it's fair to keep the behavior synonymous.

>> Why are we missing a processor-to-locationId mapping in the zknode data
model?
It is stored as a part of live processors as a part of value. I had it my
initial proposal, but received feedback just to add things that i'm
changing in the existing setup(hence removed it). Added it back now.

>> Also, why don’t we write locationId as a value to task0N znode, instead
of a child node?
It is stored as a value of the task zookeeper node. I was unable to
represent it pictorially in that zookeeper hierarchical model(hence had it
like one level down like child). Added corresponding descriptions in that
data model to make it clear.

>> And which znode is the distributed barrier that you used in the graph?
This was removed after initial feedback(suggesting to add stuff that I'm
changing in data-model). Added the barrier zookeeper node to data-model for
clarity.

I think we are on same page about most of the choices made in this
proposal. If there are other major concerns/feedback, let's discuss offline.

Thanks.

Re: [DISCUSS] SEP: Host affinity in standalone.

Posted by santhosh venkat <sa...@gmail.com>.
Hi Yi,

Thanks for your feedback. Responses inline.


> It seems like we can deprecate the whole BalancingTaskNameGrouper
altogether.

           - Yes, that’s part of the proposed interface changes.

>  That also means that you will somehow store the task-to-container
mapping info in the locality znode as well. It would be nice to make it
clear how the task-to-container-to-physical-resource mapping is stored and
read in ZK.

            - I think task assignment (task to processor) mapping is stored
in  recent version of JobModel in zookeeper.  I don’t see the value in
duplicating it in Locality znode as well (which will burden us with
maintaining consistency between same data stored at two places). I want to
derive container to localityId mapping based upon task to Locality mapping
from locality zookeeper node and container to task mapping available in
latest JobModel. When new processors join, they will not have any previous
task assignment. Any new tasks  added by changing SSPGrouper will not have
any previous host assigned (will be open to be distributed to any available
processor in the group). Please share your thoughts. Will update the
proposal, if there's a consensus on this.


> Why are we missing a processor-to-locationId mapping in the zknode data
model?

            - Planning to derive it based out of task-to-locationId mapping
from locality zookeeper node and container to taskId from the latest job
model.


>  Also needs to include compatibility test after deprecating/changing
TaskNameGrouper API, make sure the default behavior of default groupers is
the same.

        - Added it to the compatibility section.


>  In Compatibility section, remove the future versions (i.e. 0.15) since
we are not sure when this change can be completed and released yet. It
seems that we are not changing the persistent data format for locality info
in coordinator stream. Make it explicit.

        - Updated the compatibility section.

>  If you are making LocalityManager class an interface, are you planning
to make it pluggable as well? Actually, I was thinking that the model we
wanted to go is that making the metadata store for locality info an
interface and pluggable, while keep the LocalityManager as a common
implementation.

        - Yes, LocalityManager is pluggable interface (there will be two
implementations one for coordinator-stream and other for zookeeper). I
think you’re proposing the  same thing as in my change, but with a
different interface name(MetaStore instead of LocalityManager). I don't
think there'll be any value in LocalityManager class at all, once we have
the meta-store interface.

>  what’s the definition of LocationId? An interface? An abstract class?
           - It’s a unique identifier which represents a
virtual-container/physical-host (any physical  execution environment) in
which a processor runs. It’s a pluggable interface(more like
ProcessorIdGenerator). In case of YARN, physical hostname will be used as
locationId. In standalone, a combination of slice-name, slice-id,
instance-id will be used as locationId.

> For Semantics of host affinity with run.id, “all stream processors of a samza
application stops” = one run is not true for YARN deployed Samza
applications.
          - Updated the proposal based upon this feedback.

> Lastly, from the grouper API definition, you will not be able to get the
physical location info, if it is not passed in via
currentGenerationProcessorIds or ContainerModel. How are you going to
resolve that w/o creating a LocalityManager in Grouper implementation
class? I would strongly recommend no to create an instance of
LocalityManager in the Grouper implementation class.
           - LocationId is part of the ContainerModel class and will be
used to propagate the previous run's locality information.

> if the input topic partitions change, or the SSP grouper changes, the
task-to-ssp mapping changes and the task locality may not make sense at
all. Is this considered out-of-scope.

           - SystemStreamPartitionCountMonitor is out of scope for
this  proposal.
I think we can determine input streams partition change based upon previous job
model version and current number of partitions of i/p streams and purge
task locality information. After SEP-5, existing previous task locality
information can be reused.

> not quite clear to me that what’s the distributed barrier is used for in
the graph. For every container process to pick up a certain version of
JobModel? Who is waiting on the barrier? The leader or the followers? Or
everyone?
            - Leaders/followers are waiting on the barrier to agree upon a
JobModel. Will add that to the state diagram.

Thanks.

Re: [DISCUSS] SEP: Host affinity in standalone.

Posted by Yi Pan <ni...@gmail.com>.
Linking Boris' earlier comments to the correct [DISCUSSION] thread:
http://mail-archives.apache.org/mod_mbox/samza-dev/201801.mbox/%3CCAPAaT%2BtH2H5TEvFQUn9jw6iR%3DyvVEu46rDLJsqexpwKz0CAH1g%40mail.gmail.com%3E

On Thu, Feb 1, 2018 at 5:27 PM, Yi Pan <ni...@gmail.com> wrote:

> Hi, Santhoosh,
>
> Thanks for the SEP and the latest revisions. Here are some of my comments
> based on the latest proposal:
>
> - The basic idea for implementing state-aware task-to-physical-process
> assignment in JobModel is not quite clear. ContainerAllocator is solving a
> different problem in host-affinity: trying to put the physical process to
> the same physical host/resource, while the JobModel is performing the
> grouping of tasks to container processes. In YARN, we need to solve both
> affinity problems in code: ContainerAllocator to put the container to the
> specific physical host, and the TaskNameGrouper with balance() method to
> minimize the task movement, assuming ContainerAllocator is successful. In
> standalone, we only need to solve the assignment of tasks to physical host.
> So, I think the main mismatch is:
>
>    a) Standalone does not need/rely on ContainerAllocator to associate
> container with physical host. It is already done and reported by the
> containers in a job
>
>    b) The current TaskNameGrouper with balance() method only considers
> the existing task-to-container mapping, which assumes another layer of
> container-to-physical-resource is done somewhere else. Hence, it would be
> nicer to unify the balance() method to directly consider existing
> task-to-physical-resource mapping, which satisfy both YARN and standalone
> deployment
>
>
> Having said that, I think that we will need the following in JobModel
> generator:
>
> 1) ContainerInfo: (containerId, physicalResourceId)
>
>     * this mapping is the reported mapping from container processes in
> standalone, and preferred mapping in YARN
>
> 2) TaskLocality: (taskId, physicalResourceId)
>
>     * this mapping is actually reported task location from container
> processes in both standalone and YARN
>
> Then, the TaskNameGrouper.group() method is trying the best to group the
> tasks to containerIds w/ the same physicalResourceId in a balanced pattern,
> in both YARN and standalone.
>
> In YARN, the JobModel will be used by ContainerAllocator to achieve the
> preferred container-to-physical-resource mapping, while in standalone, the
> JobModel will be directly used by container processes to start processing.
>
>
> - not quite clear to me that what’s the distributed barrier is used for in
> the graph. For every container process to pick up a certain version of
> JobModel? Who is waiting on the barrier? The leader or the followers? Or
> everyone?
>
>
> - Additional question: if the input topic partitions change, or the SSP
> grouper changes, the task-to-ssp mapping changes and the task locality may
> not make sense at all. Is this considered out-of-scope for the current
> design? Or we should at least add an version number for task-to-ssp map and
> associate it with the task locality info?
>
>
> - Should the leader validate that everyone has picked up the new version
> of JobModel and reported the correct task-locality expected in the
> JobModel, after step 10 in the graph?
>
>
> - Why are we missing a processor-to-locationId mapping in the zknode data
> model? Is it a value written to processor.00000N node? Also, why don’t we
> write locationId as a value to task0N znode, instead of a child node? And
> which znode is the distributed barrier that you used in the graph?
>
>
> - In State store restoration: “nonexistent local stores will be restored…”
> is not clear to me. What do you mean by “nonexistent” here?
>
>
> - So, based on the description in “Container to physical host assignment”,
> you will need to know the full two-level mapping in locality info:
> task-to-container-to-physical-host. If that’s the case, how do you keep
> the task-to-container mapping in your current znode structure which only
> has task0N as children znodes? Please make it clear.
>
>
> - For Semantics of host affinity with run.id, “all stream processors of a
> samza application stops” = one run is not true for YARN deployed Samza
> applications. And the description in that section is very confusing. As we
> discussed last time, we should define what’s a continuous application run
> by definitions on the continuation on the local states, checkpoints, and
> intermediate message streams. Within the same run.id, the continuation of
> the states/checkpoints/intermediate message streams is guaranteed, while
> different run.id would mean a disruptive change to the application and
> throwing away all partial states. The actual conditions to determine a
> continuation of states in an Samza application in different deployment
> environment and different input sources may be different. But the
> consistent semantic meaning of “run.id” should be the continuation of
> states. With that in mind, the only relevant point is probably that
> host-affinity only makes sense within the same run.id.
>
>
> - what’s the definition of LocationId? An interface? An abstract class?
> What’s the specific sub-class for LocationId in RAIN vs in YARN? And
> potentially, in other environment like Kubernetes?
>
>
> - It seems like we can deprecate the whole BalancingTaskNameGrouper
> altogether.
>
>
> - So, it seems that the LocalityManager has no change from the interface
> methods. I.e. it still only maintains the container-to-physical-resource
> mapping as it does today. That also means that you will somehow store the
> task-to-container mapping info in the locality znode as well. It would be
> nice to make it clear how the task-to-container-to-physical-resource
> mapping is stored and read in ZK.
>
>
> - Also needs to include compatibility test after deprecating/changing
> TaskNameGrouper API, make sure the default behavior of default groupers is
> the same.
>
>
> - In Compatibility section, remove the future versions (i.e. 0.15) since
> we are not sure when this change can be completed and released yet. It
> seems that we are not changing the persistent data format for locality info
> in coordinator stream. Make it explicit.
>
>
> - If you are making LocalityManager class an interface, are you planning
> to make it pluggable as well? Actually, I was thinking that the model we
> wanted to go is that making the metadata store for locality info an
> interface and pluggable, while keep the LocalityManager as a common
> implementation.
>
>
> - Lastly, from the grouper API definition, you will not be able to get the
> physical location info, if it is not passed in via
> currentGenerationProcessorIds or ContainerModel. How are you going to
> resolve that w/o creating a LocalityManager in Grouper implementation
> class? I would strongly recommend no to create an instance of
> LocalityManager in the Grouper implementation class.
>
> On Wed, Jan 10, 2018 at 8:00 PM, santhosh venkat <
> santhoshvenkat1988@gmail.com> wrote:
>
>> Hi,
>>
>>
>> I created SEP for SAMZA-1554
>> <https://issues.apache.org/jira/browse/SAMZA-1554>: Host affinity in
>> standalone.
>>
>>
>> The link to the SEP is here:
>>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75957309
>>
>>
>> Please review and comments are welcome.
>>
>>
>> Thanks.
>>
>
>