You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Ben Sidhom <si...@google.com.INVALID> on 2019/11/16 00:09:54 UTC

Enabling fully disaggregated shuffle on Spark

I would like to start a conversation about extending the Spark shuffle
manager surface to support fully disaggregated shuffle implementations.
This is closely related to the work in SPARK-25299
<https://issues.apache.org/jira/browse/SPARK-25299>, which is focused on
refactoring the shuffle manager API (and in particular, SortShuffleManager)
to use a pluggable storage backend. The motivation for that SPIP is further
enabling Spark on Kubernetes.


The motivation for this proposal is enabling full externalized
(disaggregated) shuffle service implementations. (Facebook’s Cosco shuffle
<https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service>
is one example of such a disaggregated shuffle service.) These changes
allow the bulk of the shuffle to run in a remote service such that minimal
state resides in executors and local disk spill is minimized. The net
effect is increased job stability and performance improvements in certain
scenarios. These changes should work well with or are complementary to
SPARK-25299. Some or all points may be merged into that issue as
appropriate.


Below is a description of each component of this proposal. These changes
can ideally be introduced incrementally. I would like to gather feedback
and gauge interest from others in the community to collaborate on this.
There are likely more points that would  be useful to disaggregated shuffle
services. We can outline a more concrete plan after gathering enough input.
A working session could help us kick off this joint effort; maybe something
in the mid-January to mid-February timeframe (depending on interest and
availability. I’m happy to host at our Sunnyvale, CA offices.


ProposalScheduling and re-executing tasks

Allow coordination between the service and the Spark DAG scheduler as to
whether a given block/partition needs to be recomputed when a task fails or
when shuffle block data cannot be read. Having such coordination is
important, e.g., for suppressing recomputation after aborted executors or
for forcing late recomputation if the service internally acts as a cache.
One catchall solution is to have the shuffle manager provide an indication
of whether shuffle data is external to executors (or nodes). Another
option: allow the shuffle manager (likely on the driver) to be queried for
the existence of shuffle data for a given executor ID (or perhaps map task,
reduce task, etc). Note that this is at the level of data the scheduler is
aware of (i.e., map/reduce partitions) rather than block IDs, which are
internal details for some shuffle managers.
ShuffleManager API

Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the
service knows that data is still active. This is one way to enable
time-/job-scoped data because a disaggregated shuffle service cannot rely
on robust communication with Spark and in general has a distinct lifecycle
from the Spark deployment(s) it talks to. This would likely take the form
of a callback on ShuffleManager itself, but there are other approaches.


Add lifecycle hooks to shuffle readers and writers (e.g., to close/recycle
connections/streams/file handles as well as provide commit semantics).
SPARK-25299 adds commit semantics to the internal data storage layer, but
this is applicable to all shuffle managers at a higher level and should
apply equally to the ShuffleWriter.


Do not require ShuffleManagers to expose ShuffleBlockResolvers where they
are not needed. Ideally, this would be an implementation detail of the
shuffle manager itself. If there is substantial overlap between the
SortShuffleManager and other implementations, then the storage details can
be abstracted at the appropriate level. (SPARK-25299 does not currently
change this.)


Do not require MapStatus to include blockmanager IDs where they are not
relevant. This is captured by ShuffleBlockInfo
<https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit#heading=h.imi27prnziyj>
including an optional BlockManagerId in SPARK-25299. However, this change
should be lifted to the MapStatus level so that it applies to all
ShuffleManagers. Alternatively, use a more general data-location
abstraction than BlockManagerId. This gives the shuffle manager more
flexibility and the scheduler more information with respect to data
residence.
Serialization

Allow serializers to be used more flexibly and efficiently. For example,
have serializers support writing an arbitrary number of objects into an
existing OutputStream or ByteBuffer. This enables objects to be serialized
to direct buffers where doing so makes sense. More importantly, it allows
arbitrary metadata/framing data to be wrapped around individual objects
cheaply. Right now, that’s only possible at the stream level. (There are
hacks around this, but this would enable more idiomatic use in efficient
shuffle implementations.)


Have serializers indicate whether they are deterministic. This provides
much of the value of a shuffle service because it means that reducers do
not need to spill to disk when reading/merging/combining inputs--the data
can be grouped by the service, even without the service understanding data
types or byte representations. Alternative (less preferable since it would
break Java serialization, for example): require all serializers to be
deterministic.



--

- Ben

RE: Enabling fully disaggregated shuffle on Spark

Posted by "Jia, Ke A" <ke...@intel.com>.
Hi Ben and Felix,
This is Jia Ke from Intel Big Data Team. And I'm also interested in this. Would you please add me to the invite, thanks a lot.

Best regards,
Jia Ke
From: Qi,He <qi...@baidu.com>
Sent: Thursday, December 05, 2019 11:12 AM
To: Saisai Shao <sa...@gmail.com>
Cc: Liu,Linhong <li...@baidu.com>; Aniket Mokashi <an...@gmail.com>; Felix Cheung <fe...@hotmail.com>; Ben Sidhom <si...@google.com.invalid>; John Zhuge <jz...@apache.org>; bo yang <bo...@gmail.com>; Amogh Margoor <am...@qubole.com>; Ryan Blue <rb...@netflix.com>; Spark Dev List <de...@spark.apache.org>; Christopher Crosbie <cr...@google.com>; Griselda Cuevas <gr...@google.com>; Holden Karau <ho...@pigscanfly.ca>; Mayank Ahuja <ma...@qubole.com>; Kalyan Sivakumar <ka...@qubole.com>; alfozan@fb.com; Felix Cheung <fe...@uber.com>; Matt Cheah <mc...@palantir.com>; Yifei Huang (PD) <yi...@palantir.com>
Subject: Re: Enabling fully disaggregated shuffle on Spark

Hi Ben and Felix

This is Qi He from Baidu,same team with Linhong,I’m also interested in this. Would you please add me to the invite, thanks a lot.

Thanks
Qi, He

发件人: Saisai Shao <sa...@gmail.com>>
日期: 2019年12月4日 星期三 下午5:57
至: Greg Lee <li...@gmail.com>>
抄送: "Liu,Linhong" <li...@baidu.com>>, Aniket Mokashi <an...@gmail.com>>, Felix Cheung <fe...@hotmail.com>>, Ben Sidhom <si...@google.com.invalid>>, John Zhuge <jz...@apache.org>>, bo yang <bo...@gmail.com>>, Amogh Margoor <am...@qubole.com>>, Ryan Blue <rb...@netflix.com>>, Spark Dev List <de...@spark.apache.org>>, Christopher Crosbie <cr...@google.com>>, Griselda Cuevas <gr...@google.com>>, Holden Karau <ho...@pigscanfly.ca>>, Mayank Ahuja <ma...@qubole.com>>, Kalyan Sivakumar <ka...@qubole.com>>, "alfozan@fb.com<ma...@fb.com>" <al...@fb.com>>, Felix Cheung <fe...@uber.com>>, Matt Cheah <mc...@palantir.com>>, "Yifei Huang (PD)" <yi...@palantir.com>>
主题: Re: Enabling fully disaggregated shuffle on Spark

Hi Ben and Felix, I'm also interested in this. Would you please add me to the invite, thanks a lot.

Best regards,
Saisai

Greg Lee <li...@gmail.com>> 于2019年12月2日周一 下午11:34写道:
Hi Felix & Ben,

This is Li Hao from Baidu, same team with Linhong.

As mentioned in Linhong’s email, independent disaggregated shuffle service is also our solution and continuous exploring direction for  improving stability of Hadoop MR and Spark in the production environment. We would love to hear about this topic from community and share our experience .

Please add me to this event, thanks.

Best Regards
Li Hao

Liu,Linhong <li...@baidu.com>> 于2019年11月29日周五 下午5:09写道:
Hi Felix & Ben,
This is Linhong from Baidu based in Beijing, and we are internally using a disaggregated shuffle service (we call it DCE) as well. We launched this in production 3 years ago for Hadoop shuffle. Last year we migrated spark shuffle to the same DCE shuffle service and stability improved a lot (we can handle more than 100T shuffle now).
It would be nice if there is a Spark shuffle API support fully disaggregated shuffle and my team and I are very glad to share our experience and help on this topic.
So, if It’s possible, please add me to this event.

Thanks,
Liu, Linhong

From: Aniket Mokashi <an...@gmail.com>>
Date: Thursday, November 21, 2019 at 2:12 PM
To: Felix Cheung <fe...@hotmail.com>>
Cc: Ben Sidhom <si...@google.com.invalid>>, John Zhuge <jz...@apache.org>>, bo yang <bo...@gmail.com>>, Amogh Margoor <am...@qubole.com>>, Ryan Blue <rb...@netflix.com>>, Spark Dev List <de...@spark.apache.org>>, Christopher Crosbie <cr...@google.com>>, Griselda Cuevas <gr...@google.com>>, Holden Karau <ho...@pigscanfly.ca>>, Mayank Ahuja <ma...@qubole.com>>, Kalyan Sivakumar <ka...@qubole.com>>, "alfozan@fb.com<ma...@fb.com>" <al...@fb.com>>, Felix Cheung <fe...@uber.com>>, Matt Cheah <mc...@palantir.com>>, "Yifei Huang (PD)" <yi...@palantir.com>>
Subject: Re: Enabling fully disaggregated shuffle on Spark

Felix - please add me to this event.

Ben - should we move this proposal to a doc and open it up for edits/comments.

On Wed, Nov 20, 2019 at 5:37 PM Felix Cheung <fe...@hotmail.com>> wrote:
Great!

Due to number of constraints I won’t be sending link directly here but please r me and I will add you.


________________________________
From: Ben Sidhom <si...@google.com.INVALID>>
Sent: Wednesday, November 20, 2019 9:10:01 AM
To: John Zhuge <jz...@apache.org>>
Cc: bo yang <bo...@gmail.com>>; Amogh Margoor <am...@qubole.com>>; Ryan Blue <rb...@netflix.com>>; Ben Sidhom <si...@google.com.invalid>>; Spark Dev List <de...@spark.apache.org>>; Christopher Crosbie <cr...@google.com>>; Griselda Cuevas <gr...@google.com>>; Holden Karau <ho...@pigscanfly.ca>>; Mayank Ahuja <ma...@qubole.com>>; Kalyan Sivakumar <ka...@qubole.com>>; alfozan@fb.com<ma...@fb.com> <al...@fb.com>>; Felix Cheung <fe...@uber.com>>; Matt Cheah <mc...@palantir.com>>; Yifei Huang (PD) <yi...@palantir.com>>
Subject: Re: Enabling fully disaggregated shuffle on Spark

That sounds great!

On Wed, Nov 20, 2019 at 9:02 AM John Zhuge <jz...@apache.org>> wrote:
That will be great. Please send us the invite.

On Wed, Nov 20, 2019 at 8:56 AM bo yang <bo...@gmail.com>> wrote:
Cool, thanks Ryan, John, Amogh for the reply! Great to see you interested! Felix will have a Spark Scalability & Reliability Sync meeting on Dec 4 1pm PST. We could discuss more details there. Do you want to join?

On Tue, Nov 19, 2019 at 4:23 PM Amogh Margoor <am...@qubole.com>> wrote:
We at Qubole are also looking at disaggregating shuffle on Spark. Would love to collaborate and share learnings.

Regards,
Amogh

On Tue, Nov 19, 2019 at 4:09 PM John Zhuge <jz...@apache.org>> wrote:
Great work, Bo! Would love to hear the details.


On Tue, Nov 19, 2019 at 4:05 PM Ryan Blue <rb...@netflix.com.invalid>> wrote:
I'm interested in remote shuffle services as well. I'd love to hear about what you're using in production!

rb

On Tue, Nov 19, 2019 at 2:43 PM bo yang <bo...@gmail.com>> wrote:
Hi Ben,

Thanks for the writing up! This is Bo from Uber. I am in Felix's team in Seattle, and working on disaggregated shuffle (we called it remote shuffle service, RSS, internally). We have put RSS into production for a while, and learned a lot during the work (tried quite a few techniques to improve the remote shuffle performance). We could share our learning with the community, and also would like to hear feedback/suggestions on how to further improve remote shuffle performance. We could chat more details if you or other people are interested.

Best,
Bo

On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <si...@google.com.invalid>> wrote:

I would like to start a conversation about extending the Spark shuffle manager surface to support fully disaggregated shuffle implementations. This is closely related to the work inSPARK-25299<https://issues.apache.org/jira/browse/SPARK-25299>, which is focused on refactoring the shuffle manager API (and in particular, SortShuffleManager) to use a pluggable storage backend. The motivation for that SPIP is further enabling Spark on Kubernetes.



The motivation for this proposal is enabling full externalized (disaggregated) shuffle service implementations. (Facebook’s Cosco shuffle<https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service> is one example of such a disaggregated shuffle service.) These changes allow the bulk of the shuffle to run in a remote service such that minimal state resides in executors and local disk spill is minimized. The net effect is increased job stability and performance improvements in certain scenarios. These changes should work well with or are complementary to SPARK-25299. Some or all points may be merged into that issue as appropriate.



Below is a description of each component of this proposal. These changes can ideally be introduced incrementally. I would like to gather feedback and gauge interest from others in the community to collaborate on this. There are likely more points that would  be useful to disaggregated shuffle services. We can outline a more concrete plan after gathering enough input. A working session could help us kick off this joint effort; maybe something in the mid-January to mid-February timeframe (depending on interest and availability. I’m happy to host at our Sunnyvale, CA offices.



Proposal
Scheduling and re-executing tasks

Allow coordination between the service and the Spark DAG scheduler as to whether a given block/partition needs to be recomputed when a task fails or when shuffle block data cannot be read. Having such coordination is important, e.g., for suppressing recomputation after aborted executors or for forcing late recomputation if the service internally acts as a cache. One catchall solution is to have the shuffle manager provide an indication of whether shuffle data is external to executors (or nodes). Another option: allow the shuffle manager (likely on the driver) to be queried for the existence of shuffle data for a given executor ID (or perhaps map task, reduce task, etc). Note that this is at the level of data the scheduler is aware of (i.e., map/reduce partitions) rather than block IDs, which are internal details for some shuffle managers.

ShuffleManager API

Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the service knows that data is still active. This is one way to enable time-/job-scoped data because a disaggregated shuffle service cannot rely on robust communication with Spark and in general has a distinct lifecycle from the Spark deployment(s) it talks to. This would likely take the form of a callback on ShuffleManager itself, but there are other approaches.



Add lifecycle hooks to shuffle readers and writers (e.g., to close/recycle connections/streams/file handles as well as provide commit semantics). SPARK-25299 adds commit semantics to the internal data storage layer, but this is applicable to all shuffle managers at a higher level and should apply equally to the ShuffleWriter.



Do not require ShuffleManagers to expose ShuffleBlockResolvers where they are not needed. Ideally, this would be an implementation detail of the shuffle manager itself. If there is substantial overlap between the SortShuffleManager and other implementations, then the storage details can be abstracted at the appropriate level. (SPARK-25299 does not currently change this.)



Do not require MapStatus to include blockmanager IDs where they are not relevant. This is captured byShuffleBlockInfo<https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit#heading=h.imi27prnziyj> including an optional BlockManagerId in SPARK-25299. However, this change should be lifted to the MapStatus level so that it applies to all ShuffleManagers. Alternatively, use a more general data-location abstraction than BlockManagerId. This gives the shuffle manager more flexibility and the scheduler more information with respect to data residence.

Serialization

Allow serializers to be used more flexibly and efficiently. For example, have serializers support writing an arbitrary number of objects into an existing OutputStream or ByteBuffer. This enables objects to be serialized to direct buffers where doing so makes sense. More importantly, it allows arbitrary metadata/framing data to be wrapped around individual objects cheaply. Right now, that’s only possible at the stream level. (There are hacks around this, but this would enable more idiomatic use in efficient shuffle implementations.)



Have serializers indicate whether they are deterministic. This provides much of the value of a shuffle service because it means that reducers do not need to spill to disk when reading/merging/combining inputs--the data can be grouped by the service, even without the service understanding data types or byte representations. Alternative (less preferable since it would break Java serialization, for example): require all serializers to be deterministic.





--

- Ben


--
Ryan Blue
Software Engineer
Netflix


--
John Zhuge


--
John Zhuge


--
-Ben


--
"...:::Aniket:::... Quetzalco@tl"

Re: Enabling fully disaggregated shuffle on Spark

Posted by "Qi,He" <qi...@baidu.com>.
Hi Ben and Felix

This is Qi He from Baidu,same team with Linhong,I’m also interested in this. Would you please add me to the invite, thanks a lot.

Thanks
Qi, He

发件人: Saisai Shao <sa...@gmail.com>>
日期: 2019年12月4日 星期三 下午5:57
至: Greg Lee <li...@gmail.com>>
抄送: "Liu,Linhong" <li...@baidu.com>>, Aniket Mokashi <an...@gmail.com>>, Felix Cheung <fe...@hotmail.com>>, Ben Sidhom <si...@google.com.invalid>>, John Zhuge <jz...@apache.org>>, bo yang <bo...@gmail.com>>, Amogh Margoor <am...@qubole.com>>, Ryan Blue <rb...@netflix.com>>, Spark Dev List <de...@spark.apache.org>>, Christopher Crosbie <cr...@google.com>>, Griselda Cuevas <gr...@google.com>>, Holden Karau <ho...@pigscanfly.ca>>, Mayank Ahuja <ma...@qubole.com>>, Kalyan Sivakumar <ka...@qubole.com>>, "alfozan@fb.com<ma...@fb.com>" <al...@fb.com>>, Felix Cheung <fe...@uber.com>>, Matt Cheah <mc...@palantir.com>>, "Yifei Huang (PD)" <yi...@palantir.com>>
主题: Re: Enabling fully disaggregated shuffle on Spark

Hi Ben and Felix, I'm also interested in this. Would you please add me to the invite, thanks a lot.

Best regards,
Saisai

Greg Lee <li...@gmail.com>> 于2019年12月2日周一 下午11:34写道:
Hi Felix & Ben,

This is Li Hao from Baidu, same team with Linhong.

As mentioned in Linhong’s email, independent disaggregated shuffle service is also our solution and continuous exploring direction for  improving stability of Hadoop MR and Spark in the production environment. We would love to hear about this topic from community and share our experience .

Please add me to this event, thanks.

Best Regards
Li Hao

Liu,Linhong <li...@baidu.com>> 于2019年11月29日周五 下午5:09写道:
Hi Felix & Ben,
This is Linhong from Baidu based in Beijing, and we are internally using a disaggregated shuffle service (we call it DCE) as well. We launched this in production 3 years ago for Hadoop shuffle. Last year we migrated spark shuffle to the same DCE shuffle service and stability improved a lot (we can handle more than 100T shuffle now).
It would be nice if there is a Spark shuffle API support fully disaggregated shuffle and my team and I are very glad to share our experience and help on this topic.
So, if It’s possible, please add me to this event.

Thanks,
Liu, Linhong

From: Aniket Mokashi <an...@gmail.com>>
Date: Thursday, November 21, 2019 at 2:12 PM
To: Felix Cheung <fe...@hotmail.com>>
Cc: Ben Sidhom <si...@google.com.invalid>>, John Zhuge <jz...@apache.org>>, bo yang <bo...@gmail.com>>, Amogh Margoor <am...@qubole.com>>, Ryan Blue <rb...@netflix.com>>, Spark Dev List <de...@spark.apache.org>>, Christopher Crosbie <cr...@google.com>>, Griselda Cuevas <gr...@google.com>>, Holden Karau <ho...@pigscanfly.ca>>, Mayank Ahuja <ma...@qubole.com>>, Kalyan Sivakumar <ka...@qubole.com>>, "alfozan@fb.com<ma...@fb.com>" <al...@fb.com>>, Felix Cheung <fe...@uber.com>>, Matt Cheah <mc...@palantir.com>>, "Yifei Huang (PD)" <yi...@palantir.com>>
Subject: Re: Enabling fully disaggregated shuffle on Spark

Felix - please add me to this event.

Ben - should we move this proposal to a doc and open it up for edits/comments.

On Wed, Nov 20, 2019 at 5:37 PM Felix Cheung <fe...@hotmail.com>> wrote:
Great!

Due to number of constraints I won’t be sending link directly here but please r me and I will add you.


________________________________
From: Ben Sidhom <si...@google.com.INVALID>>
Sent: Wednesday, November 20, 2019 9:10:01 AM
To: John Zhuge <jz...@apache.org>>
Cc: bo yang <bo...@gmail.com>>; Amogh Margoor <am...@qubole.com>>; Ryan Blue <rb...@netflix.com>>; Ben Sidhom <si...@google.com.invalid>>; Spark Dev List <de...@spark.apache.org>>; Christopher Crosbie <cr...@google.com>>; Griselda Cuevas <gr...@google.com>>; Holden Karau <ho...@pigscanfly.ca>>; Mayank Ahuja <ma...@qubole.com>>; Kalyan Sivakumar <ka...@qubole.com>>; alfozan@fb.com<ma...@fb.com> <al...@fb.com>>; Felix Cheung <fe...@uber.com>>; Matt Cheah <mc...@palantir.com>>; Yifei Huang (PD) <yi...@palantir.com>>
Subject: Re: Enabling fully disaggregated shuffle on Spark

That sounds great!

On Wed, Nov 20, 2019 at 9:02 AM John Zhuge <jz...@apache.org>> wrote:
That will be great. Please send us the invite.

On Wed, Nov 20, 2019 at 8:56 AM bo yang <bo...@gmail.com>> wrote:
Cool, thanks Ryan, John, Amogh for the reply! Great to see you interested! Felix will have a Spark Scalability & Reliability Sync meeting on Dec 4 1pm PST. We could discuss more details there. Do you want to join?

On Tue, Nov 19, 2019 at 4:23 PM Amogh Margoor <am...@qubole.com>> wrote:
We at Qubole are also looking at disaggregating shuffle on Spark. Would love to collaborate and share learnings.

Regards,
Amogh

On Tue, Nov 19, 2019 at 4:09 PM John Zhuge <jz...@apache.org>> wrote:
Great work, Bo! Would love to hear the details.


On Tue, Nov 19, 2019 at 4:05 PM Ryan Blue <rb...@netflix.com.invalid>> wrote:
I'm interested in remote shuffle services as well. I'd love to hear about what you're using in production!

rb

On Tue, Nov 19, 2019 at 2:43 PM bo yang <bo...@gmail.com>> wrote:
Hi Ben,

Thanks for the writing up! This is Bo from Uber. I am in Felix's team in Seattle, and working on disaggregated shuffle (we called it remote shuffle service, RSS, internally). We have put RSS into production for a while, and learned a lot during the work (tried quite a few techniques to improve the remote shuffle performance). We could share our learning with the community, and also would like to hear feedback/suggestions on how to further improve remote shuffle performance. We could chat more details if you or other people are interested.

Best,
Bo

On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <si...@google.com.invalid>> wrote:

I would like to start a conversation about extending the Spark shuffle manager surface to support fully disaggregated shuffle implementations. This is closely related to the work inSPARK-25299<https://issues.apache.org/jira/browse/SPARK-25299>, which is focused on refactoring the shuffle manager API (and in particular, SortShuffleManager) to use a pluggable storage backend. The motivation for that SPIP is further enabling Spark on Kubernetes.



The motivation for this proposal is enabling full externalized (disaggregated) shuffle service implementations. (Facebook’s Cosco shuffle<https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service> is one example of such a disaggregated shuffle service.) These changes allow the bulk of the shuffle to run in a remote service such that minimal state resides in executors and local disk spill is minimized. The net effect is increased job stability and performance improvements in certain scenarios. These changes should work well with or are complementary to SPARK-25299. Some or all points may be merged into that issue as appropriate.



Below is a description of each component of this proposal. These changes can ideally be introduced incrementally. I would like to gather feedback and gauge interest from others in the community to collaborate on this. There are likely more points that would  be useful to disaggregated shuffle services. We can outline a more concrete plan after gathering enough input. A working session could help us kick off this joint effort; maybe something in the mid-January to mid-February timeframe (depending on interest and availability. I’m happy to host at our Sunnyvale, CA offices.



Proposal
Scheduling and re-executing tasks

Allow coordination between the service and the Spark DAG scheduler as to whether a given block/partition needs to be recomputed when a task fails or when shuffle block data cannot be read. Having such coordination is important, e.g., for suppressing recomputation after aborted executors or for forcing late recomputation if the service internally acts as a cache. One catchall solution is to have the shuffle manager provide an indication of whether shuffle data is external to executors (or nodes). Another option: allow the shuffle manager (likely on the driver) to be queried for the existence of shuffle data for a given executor ID (or perhaps map task, reduce task, etc). Note that this is at the level of data the scheduler is aware of (i.e., map/reduce partitions) rather than block IDs, which are internal details for some shuffle managers.

ShuffleManager API

Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the service knows that data is still active. This is one way to enable time-/job-scoped data because a disaggregated shuffle service cannot rely on robust communication with Spark and in general has a distinct lifecycle from the Spark deployment(s) it talks to. This would likely take the form of a callback on ShuffleManager itself, but there are other approaches.



Add lifecycle hooks to shuffle readers and writers (e.g., to close/recycle connections/streams/file handles as well as provide commit semantics). SPARK-25299 adds commit semantics to the internal data storage layer, but this is applicable to all shuffle managers at a higher level and should apply equally to the ShuffleWriter.



Do not require ShuffleManagers to expose ShuffleBlockResolvers where they are not needed. Ideally, this would be an implementation detail of the shuffle manager itself. If there is substantial overlap between the SortShuffleManager and other implementations, then the storage details can be abstracted at the appropriate level. (SPARK-25299 does not currently change this.)



Do not require MapStatus to include blockmanager IDs where they are not relevant. This is captured byShuffleBlockInfo<https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit#heading=h.imi27prnziyj> including an optional BlockManagerId in SPARK-25299. However, this change should be lifted to the MapStatus level so that it applies to all ShuffleManagers. Alternatively, use a more general data-location abstraction than BlockManagerId. This gives the shuffle manager more flexibility and the scheduler more information with respect to data residence.

Serialization

Allow serializers to be used more flexibly and efficiently. For example, have serializers support writing an arbitrary number of objects into an existing OutputStream or ByteBuffer. This enables objects to be serialized to direct buffers where doing so makes sense. More importantly, it allows arbitrary metadata/framing data to be wrapped around individual objects cheaply. Right now, that’s only possible at the stream level. (There are hacks around this, but this would enable more idiomatic use in efficient shuffle implementations.)



Have serializers indicate whether they are deterministic. This provides much of the value of a shuffle service because it means that reducers do not need to spill to disk when reading/merging/combining inputs--the data can be grouped by the service, even without the service understanding data types or byte representations. Alternative (less preferable since it would break Java serialization, for example): require all serializers to be deterministic.





--

- Ben


--
Ryan Blue
Software Engineer
Netflix


--
John Zhuge


--
John Zhuge


--
-Ben


--
"...:::Aniket:::... Quetzalco@tl"

Re: Enabling fully disaggregated shuffle on Spark

Posted by Saisai Shao <sa...@gmail.com>.
Hi Ben and Felix, I'm also interested in this. Would you please add me to
the invite, thanks a lot.

Best regards,
Saisai

Greg Lee <li...@gmail.com> 于2019年12月2日周一 下午11:34写道:

> Hi Felix & Ben,
>
> This is Li Hao from Baidu, same team with Linhong.
>
> As mentioned in Linhong’s email, independent disaggregated shuffle service
> is also our solution and continuous exploring direction for  improving
> stability of Hadoop MR and Spark in the production environment. We would
> love to hear about this topic from community and share our experience .
>
> Please add me to this event, thanks.
>
> Best Regards
> Li Hao
>
> Liu,Linhong <li...@baidu.com> 于2019年11月29日周五 下午5:09写道:
>
>> Hi Felix & Ben,
>>
>> This is Linhong from Baidu based in Beijing, and we are internally using
>> a disaggregated shuffle service (we call it DCE) as well. We launched this
>> in production 3 years ago for Hadoop shuffle. Last year we migrated spark
>> shuffle to the same DCE shuffle service and stability improved a lot (we
>> can handle more than 100T shuffle now).
>>
>> It would be nice if there is a Spark shuffle API support fully
>> disaggregated shuffle and my team and I are very glad to share our
>> experience and help on this topic.
>>
>> So, if It’s possible, please add me to this event.
>>
>>
>>
>> Thanks,
>>
>> Liu, Linhong
>>
>>
>>
>> *From: *Aniket Mokashi <an...@gmail.com>
>> *Date: *Thursday, November 21, 2019 at 2:12 PM
>> *To: *Felix Cheung <fe...@hotmail.com>
>> *Cc: *Ben Sidhom <si...@google.com.invalid>, John Zhuge <
>> jzhuge@apache.org>, bo yang <bo...@gmail.com>, Amogh Margoor <
>> amoghm@qubole.com>, Ryan Blue <rb...@netflix.com>, Spark Dev List <
>> dev@spark.apache.org>, Christopher Crosbie <cr...@google.com>,
>> Griselda Cuevas <gr...@google.com>, Holden Karau <ho...@pigscanfly.ca>,
>> Mayank Ahuja <ma...@qubole.com>, Kalyan Sivakumar <ka...@qubole.com>,
>> "alfozan@fb.com" <al...@fb.com>, Felix Cheung <fe...@uber.com>, Matt
>> Cheah <mc...@palantir.com>, "Yifei Huang (PD)" <yi...@palantir.com>
>> *Subject: *Re: Enabling fully disaggregated shuffle on Spark
>>
>>
>>
>> Felix - please add me to this event.
>>
>>
>>
>> Ben - should we move this proposal to a doc and open it up for
>> edits/comments.
>>
>>
>>
>> On Wed, Nov 20, 2019 at 5:37 PM Felix Cheung <fe...@hotmail.com>
>> wrote:
>>
>> Great!
>>
>>
>>
>> Due to number of constraints I won’t be sending link directly here but
>> please r me and I will add you.
>>
>>
>>
>>
>> ------------------------------
>>
>> *From:* Ben Sidhom <si...@google.com.INVALID>
>> *Sent:* Wednesday, November 20, 2019 9:10:01 AM
>> *To:* John Zhuge <jz...@apache.org>
>> *Cc:* bo yang <bo...@gmail.com>; Amogh Margoor <am...@qubole.com>;
>> Ryan Blue <rb...@netflix.com>; Ben Sidhom <si...@google.com.invalid>;
>> Spark Dev List <de...@spark.apache.org>; Christopher Crosbie <
>> crosbiec@google.com>; Griselda Cuevas <gr...@google.com>; Holden Karau <
>> holden@pigscanfly.ca>; Mayank Ahuja <ma...@qubole.com>; Kalyan
>> Sivakumar <ka...@qubole.com>; alfozan@fb.com <al...@fb.com>; Felix
>> Cheung <fe...@uber.com>; Matt Cheah <mc...@palantir.com>; Yifei Huang
>> (PD) <yi...@palantir.com>
>> *Subject:* Re: Enabling fully disaggregated shuffle on Spark
>>
>>
>>
>> That sounds great!
>>
>>
>>
>> On Wed, Nov 20, 2019 at 9:02 AM John Zhuge <jz...@apache.org> wrote:
>>
>> That will be great. Please send us the invite.
>>
>>
>>
>> On Wed, Nov 20, 2019 at 8:56 AM bo yang <bo...@gmail.com> wrote:
>>
>> Cool, thanks Ryan, John, Amogh for the reply! Great to see you
>> interested! Felix will have a Spark Scalability & Reliability Sync
>> meeting on Dec 4 1pm PST. We could discuss more details there. Do you want
>> to join?
>>
>>
>>
>> On Tue, Nov 19, 2019 at 4:23 PM Amogh Margoor <am...@qubole.com> wrote:
>>
>> We at Qubole are also looking at disaggregating shuffle on Spark. Would
>> love to collaborate and share learnings.
>>
>>
>>
>> Regards,
>>
>> Amogh
>>
>>
>>
>> On Tue, Nov 19, 2019 at 4:09 PM John Zhuge <jz...@apache.org> wrote:
>>
>> Great work, Bo! Would love to hear the details.
>>
>>
>>
>>
>>
>> On Tue, Nov 19, 2019 at 4:05 PM Ryan Blue <rb...@netflix.com.invalid>
>> wrote:
>>
>> I'm interested in remote shuffle services as well. I'd love to hear about
>> what you're using in production!
>>
>>
>>
>> rb
>>
>>
>>
>> On Tue, Nov 19, 2019 at 2:43 PM bo yang <bo...@gmail.com> wrote:
>>
>> Hi Ben,
>>
>>
>>
>> Thanks for the writing up! This is Bo from Uber. I am in Felix's team in
>> Seattle, and working on disaggregated shuffle (we called it remote shuffle
>> service, RSS, internally). We have put RSS into production for a while, and
>> learned a lot during the work (tried quite a few techniques to improve the
>> remote shuffle performance). We could share our learning with the
>> community, and also would like to hear feedback/suggestions on how to
>> further improve remote shuffle performance. We could chat more details if
>> you or other people are interested.
>>
>>
>>
>> Best,
>>
>> Bo
>>
>>
>>
>> On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <si...@google.com.invalid>
>> wrote:
>>
>> I would like to start a conversation about extending the Spark shuffle
>> manager surface to support fully disaggregated shuffle implementations.
>> This is closely related to the work in SPARK-25299
>> <https://issues.apache.org/jira/browse/SPARK-25299>, which is focused on
>> refactoring the shuffle manager API (and in particular, SortShuffleManager)
>> to use a pluggable storage backend. The motivation for that SPIP is further
>> enabling Spark on Kubernetes.
>>
>>
>>
>> The motivation for this proposal is enabling full externalized
>> (disaggregated) shuffle service implementations. (Facebook’s Cosco
>> shuffle
>> <https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service>
>> is one example of such a disaggregated shuffle service.) These changes
>> allow the bulk of the shuffle to run in a remote service such that minimal
>> state resides in executors and local disk spill is minimized. The net
>> effect is increased job stability and performance improvements in certain
>> scenarios. These changes should work well with or are complementary to
>> SPARK-25299. Some or all points may be merged into that issue as
>> appropriate.
>>
>>
>>
>> Below is a description of each component of this proposal. These changes
>> can ideally be introduced incrementally. I would like to gather feedback
>> and gauge interest from others in the community to collaborate on this.
>> There are likely more points that would  be useful to disaggregated shuffle
>> services. We can outline a more concrete plan after gathering enough input.
>> A working session could help us kick off this joint effort; maybe something
>> in the mid-January to mid-February timeframe (depending on interest and
>> availability. I’m happy to host at our Sunnyvale, CA offices.
>>
>>
>> Proposal Scheduling and re-executing tasks
>>
>> Allow coordination between the service and the Spark DAG scheduler as to
>> whether a given block/partition needs to be recomputed when a task fails or
>> when shuffle block data cannot be read. Having such coordination is
>> important, e.g., for suppressing recomputation after aborted executors or
>> for forcing late recomputation if the service internally acts as a cache.
>> One catchall solution is to have the shuffle manager provide an indication
>> of whether shuffle data is external to executors (or nodes). Another
>> option: allow the shuffle manager (likely on the driver) to be queried for
>> the existence of shuffle data for a given executor ID (or perhaps map task,
>> reduce task, etc). Note that this is at the level of data the scheduler is
>> aware of (i.e., map/reduce partitions) rather than block IDs, which are
>> internal details for some shuffle managers.
>> ShuffleManager API
>>
>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the
>> service knows that data is still active. This is one way to enable
>> time-/job-scoped data because a disaggregated shuffle service cannot rely
>> on robust communication with Spark and in general has a distinct lifecycle
>> from the Spark deployment(s) it talks to. This would likely take the form
>> of a callback on ShuffleManager itself, but there are other approaches.
>>
>>
>>
>> Add lifecycle hooks to shuffle readers and writers (e.g., to
>> close/recycle connections/streams/file handles as well as provide commit
>> semantics). SPARK-25299 adds commit semantics to the internal data storage
>> layer, but this is applicable to all shuffle managers at a higher level and
>> should apply equally to the ShuffleWriter.
>>
>>
>>
>> Do not require ShuffleManagers to expose ShuffleBlockResolvers where they
>> are not needed. Ideally, this would be an implementation detail of the
>> shuffle manager itself. If there is substantial overlap between the
>> SortShuffleManager and other implementations, then the storage details can
>> be abstracted at the appropriate level. (SPARK-25299 does not currently
>> change this.)
>>
>>
>>
>> Do not require MapStatus to include blockmanager IDs where they are not
>> relevant. This is captured by ShuffleBlockInfo
>> <https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit#heading=h.imi27prnziyj>
>> including an *optional* BlockManagerId in SPARK-25299. However, this
>> change should be lifted to the MapStatus level so that it applies to all
>> ShuffleManagers. Alternatively, use a more general data-location
>> abstraction than BlockManagerId. This gives the shuffle manager more
>> flexibility and the scheduler more information with respect to data
>> residence.
>> Serialization
>>
>> Allow serializers to be used more flexibly and efficiently. For example,
>> have serializers support writing an arbitrary number of objects into an
>> existing OutputStream or ByteBuffer. This enables objects to be serialized
>> to direct buffers where doing so makes sense. More importantly, it allows
>> arbitrary metadata/framing data to be wrapped around individual objects
>> cheaply. Right now, that’s only possible at the stream level. (There are
>> hacks around this, but this would enable more idiomatic use in efficient
>> shuffle implementations.)
>>
>>
>>
>> Have serializers indicate whether they are deterministic. This provides
>> much of the value of a shuffle service because it means that reducers do
>> not need to spill to disk when reading/merging/combining inputs--the data
>> can be grouped by the service, even without the service understanding data
>> types or byte representations. Alternative (less preferable since it would
>> break Java serialization, for example): require all serializers to be
>> deterministic.
>>
>>
>>
>>
>>
>> --
>>
>> - Ben
>>
>>
>>
>>
>> --
>>
>> Ryan Blue
>>
>> Software Engineer
>>
>> Netflix
>>
>>
>>
>>
>> --
>>
>> John Zhuge
>>
>>
>>
>>
>> --
>>
>> John Zhuge
>>
>>
>>
>>
>> --
>>
>> -Ben
>>
>>
>>
>>
>> --
>>
>> "...:::Aniket:::... Quetzalco@tl"
>>
>

Re: Enabling fully disaggregated shuffle on Spark

Posted by Greg Lee <li...@gmail.com>.
Hi Felix & Ben,

This is Li Hao from Baidu, same team with Linhong.

As mentioned in Linhong’s email, independent disaggregated shuffle service
is also our solution and continuous exploring direction for  improving
stability of Hadoop MR and Spark in the production environment. We would
love to hear about this topic from community and share our experience .

Please add me to this event, thanks.

Best Regards
Li Hao

Liu,Linhong <li...@baidu.com> 于2019年11月29日周五 下午5:09写道:

> Hi Felix & Ben,
>
> This is Linhong from Baidu based in Beijing, and we are internally using a
> disaggregated shuffle service (we call it DCE) as well. We launched this in
> production 3 years ago for Hadoop shuffle. Last year we migrated spark
> shuffle to the same DCE shuffle service and stability improved a lot (we
> can handle more than 100T shuffle now).
>
> It would be nice if there is a Spark shuffle API support fully
> disaggregated shuffle and my team and I are very glad to share our
> experience and help on this topic.
>
> So, if It’s possible, please add me to this event.
>
>
>
> Thanks,
>
> Liu, Linhong
>
>
>
> *From: *Aniket Mokashi <an...@gmail.com>
> *Date: *Thursday, November 21, 2019 at 2:12 PM
> *To: *Felix Cheung <fe...@hotmail.com>
> *Cc: *Ben Sidhom <si...@google.com.invalid>, John Zhuge <
> jzhuge@apache.org>, bo yang <bo...@gmail.com>, Amogh Margoor <
> amoghm@qubole.com>, Ryan Blue <rb...@netflix.com>, Spark Dev List <
> dev@spark.apache.org>, Christopher Crosbie <cr...@google.com>,
> Griselda Cuevas <gr...@google.com>, Holden Karau <ho...@pigscanfly.ca>,
> Mayank Ahuja <ma...@qubole.com>, Kalyan Sivakumar <ka...@qubole.com>, "
> alfozan@fb.com" <al...@fb.com>, Felix Cheung <fe...@uber.com>, Matt
> Cheah <mc...@palantir.com>, "Yifei Huang (PD)" <yi...@palantir.com>
> *Subject: *Re: Enabling fully disaggregated shuffle on Spark
>
>
>
> Felix - please add me to this event.
>
>
>
> Ben - should we move this proposal to a doc and open it up for
> edits/comments.
>
>
>
> On Wed, Nov 20, 2019 at 5:37 PM Felix Cheung <fe...@hotmail.com>
> wrote:
>
> Great!
>
>
>
> Due to number of constraints I won’t be sending link directly here but
> please r me and I will add you.
>
>
>
>
> ------------------------------
>
> *From:* Ben Sidhom <si...@google.com.INVALID>
> *Sent:* Wednesday, November 20, 2019 9:10:01 AM
> *To:* John Zhuge <jz...@apache.org>
> *Cc:* bo yang <bo...@gmail.com>; Amogh Margoor <am...@qubole.com>;
> Ryan Blue <rb...@netflix.com>; Ben Sidhom <si...@google.com.invalid>;
> Spark Dev List <de...@spark.apache.org>; Christopher Crosbie <
> crosbiec@google.com>; Griselda Cuevas <gr...@google.com>; Holden Karau <
> holden@pigscanfly.ca>; Mayank Ahuja <ma...@qubole.com>; Kalyan Sivakumar
> <ka...@qubole.com>; alfozan@fb.com <al...@fb.com>; Felix Cheung <
> felixc@uber.com>; Matt Cheah <mc...@palantir.com>; Yifei Huang (PD) <
> yifeih@palantir.com>
> *Subject:* Re: Enabling fully disaggregated shuffle on Spark
>
>
>
> That sounds great!
>
>
>
> On Wed, Nov 20, 2019 at 9:02 AM John Zhuge <jz...@apache.org> wrote:
>
> That will be great. Please send us the invite.
>
>
>
> On Wed, Nov 20, 2019 at 8:56 AM bo yang <bo...@gmail.com> wrote:
>
> Cool, thanks Ryan, John, Amogh for the reply! Great to see you interested!
> Felix will have a Spark Scalability & Reliability Sync meeting on Dec 4 1pm
> PST. We could discuss more details there. Do you want to join?
>
>
>
> On Tue, Nov 19, 2019 at 4:23 PM Amogh Margoor <am...@qubole.com> wrote:
>
> We at Qubole are also looking at disaggregating shuffle on Spark. Would
> love to collaborate and share learnings.
>
>
>
> Regards,
>
> Amogh
>
>
>
> On Tue, Nov 19, 2019 at 4:09 PM John Zhuge <jz...@apache.org> wrote:
>
> Great work, Bo! Would love to hear the details.
>
>
>
>
>
> On Tue, Nov 19, 2019 at 4:05 PM Ryan Blue <rb...@netflix.com.invalid>
> wrote:
>
> I'm interested in remote shuffle services as well. I'd love to hear about
> what you're using in production!
>
>
>
> rb
>
>
>
> On Tue, Nov 19, 2019 at 2:43 PM bo yang <bo...@gmail.com> wrote:
>
> Hi Ben,
>
>
>
> Thanks for the writing up! This is Bo from Uber. I am in Felix's team in
> Seattle, and working on disaggregated shuffle (we called it remote shuffle
> service, RSS, internally). We have put RSS into production for a while, and
> learned a lot during the work (tried quite a few techniques to improve the
> remote shuffle performance). We could share our learning with the
> community, and also would like to hear feedback/suggestions on how to
> further improve remote shuffle performance. We could chat more details if
> you or other people are interested.
>
>
>
> Best,
>
> Bo
>
>
>
> On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <si...@google.com.invalid>
> wrote:
>
> I would like to start a conversation about extending the Spark shuffle
> manager surface to support fully disaggregated shuffle implementations.
> This is closely related to the work in SPARK-25299
> <https://issues.apache.org/jira/browse/SPARK-25299>, which is focused on
> refactoring the shuffle manager API (and in particular, SortShuffleManager)
> to use a pluggable storage backend. The motivation for that SPIP is further
> enabling Spark on Kubernetes.
>
>
>
> The motivation for this proposal is enabling full externalized
> (disaggregated) shuffle service implementations. (Facebook’s Cosco shuffle
> <https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service>
> is one example of such a disaggregated shuffle service.) These changes
> allow the bulk of the shuffle to run in a remote service such that minimal
> state resides in executors and local disk spill is minimized. The net
> effect is increased job stability and performance improvements in certain
> scenarios. These changes should work well with or are complementary to
> SPARK-25299. Some or all points may be merged into that issue as
> appropriate.
>
>
>
> Below is a description of each component of this proposal. These changes
> can ideally be introduced incrementally. I would like to gather feedback
> and gauge interest from others in the community to collaborate on this.
> There are likely more points that would  be useful to disaggregated shuffle
> services. We can outline a more concrete plan after gathering enough input.
> A working session could help us kick off this joint effort; maybe something
> in the mid-January to mid-February timeframe (depending on interest and
> availability. I’m happy to host at our Sunnyvale, CA offices.
>
>
> Proposal Scheduling and re-executing tasks
>
> Allow coordination between the service and the Spark DAG scheduler as to
> whether a given block/partition needs to be recomputed when a task fails or
> when shuffle block data cannot be read. Having such coordination is
> important, e.g., for suppressing recomputation after aborted executors or
> for forcing late recomputation if the service internally acts as a cache.
> One catchall solution is to have the shuffle manager provide an indication
> of whether shuffle data is external to executors (or nodes). Another
> option: allow the shuffle manager (likely on the driver) to be queried for
> the existence of shuffle data for a given executor ID (or perhaps map task,
> reduce task, etc). Note that this is at the level of data the scheduler is
> aware of (i.e., map/reduce partitions) rather than block IDs, which are
> internal details for some shuffle managers.
> ShuffleManager API
>
> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the
> service knows that data is still active. This is one way to enable
> time-/job-scoped data because a disaggregated shuffle service cannot rely
> on robust communication with Spark and in general has a distinct lifecycle
> from the Spark deployment(s) it talks to. This would likely take the form
> of a callback on ShuffleManager itself, but there are other approaches.
>
>
>
> Add lifecycle hooks to shuffle readers and writers (e.g., to close/recycle
> connections/streams/file handles as well as provide commit semantics).
> SPARK-25299 adds commit semantics to the internal data storage layer, but
> this is applicable to all shuffle managers at a higher level and should
> apply equally to the ShuffleWriter.
>
>
>
> Do not require ShuffleManagers to expose ShuffleBlockResolvers where they
> are not needed. Ideally, this would be an implementation detail of the
> shuffle manager itself. If there is substantial overlap between the
> SortShuffleManager and other implementations, then the storage details can
> be abstracted at the appropriate level. (SPARK-25299 does not currently
> change this.)
>
>
>
> Do not require MapStatus to include blockmanager IDs where they are not
> relevant. This is captured by ShuffleBlockInfo
> <https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit#heading=h.imi27prnziyj>
> including an *optional* BlockManagerId in SPARK-25299. However, this
> change should be lifted to the MapStatus level so that it applies to all
> ShuffleManagers. Alternatively, use a more general data-location
> abstraction than BlockManagerId. This gives the shuffle manager more
> flexibility and the scheduler more information with respect to data
> residence.
> Serialization
>
> Allow serializers to be used more flexibly and efficiently. For example,
> have serializers support writing an arbitrary number of objects into an
> existing OutputStream or ByteBuffer. This enables objects to be serialized
> to direct buffers where doing so makes sense. More importantly, it allows
> arbitrary metadata/framing data to be wrapped around individual objects
> cheaply. Right now, that’s only possible at the stream level. (There are
> hacks around this, but this would enable more idiomatic use in efficient
> shuffle implementations.)
>
>
>
> Have serializers indicate whether they are deterministic. This provides
> much of the value of a shuffle service because it means that reducers do
> not need to spill to disk when reading/merging/combining inputs--the data
> can be grouped by the service, even without the service understanding data
> types or byte representations. Alternative (less preferable since it would
> break Java serialization, for example): require all serializers to be
> deterministic.
>
>
>
>
>
> --
>
> - Ben
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>
>
>
>
> --
>
> John Zhuge
>
>
>
>
> --
>
> John Zhuge
>
>
>
>
> --
>
> -Ben
>
>
>
>
> --
>
> "...:::Aniket:::... Quetzalco@tl"
>

Re: Enabling fully disaggregated shuffle on Spark

Posted by "Liu,Linhong" <li...@baidu.com>.
Hi Felix & Ben,
This is Linhong from Baidu based in Beijing, and we are internally using a disaggregated shuffle service (we call it DCE) as well. We launched this in production 3 years ago for Hadoop shuffle. Last year we migrated spark shuffle to the same DCE shuffle service and stability improved a lot (we can handle more than 100T shuffle now).
It would be nice if there is a Spark shuffle API support fully disaggregated shuffle and my team and I are very glad to share our experience and help on this topic.
So, if It’s possible, please add me to this event.

Thanks,
Liu, Linhong

From: Aniket Mokashi <an...@gmail.com>
Date: Thursday, November 21, 2019 at 2:12 PM
To: Felix Cheung <fe...@hotmail.com>
Cc: Ben Sidhom <si...@google.com.invalid>, John Zhuge <jz...@apache.org>, bo yang <bo...@gmail.com>, Amogh Margoor <am...@qubole.com>, Ryan Blue <rb...@netflix.com>, Spark Dev List <de...@spark.apache.org>, Christopher Crosbie <cr...@google.com>, Griselda Cuevas <gr...@google.com>, Holden Karau <ho...@pigscanfly.ca>, Mayank Ahuja <ma...@qubole.com>, Kalyan Sivakumar <ka...@qubole.com>, "alfozan@fb.com" <al...@fb.com>, Felix Cheung <fe...@uber.com>, Matt Cheah <mc...@palantir.com>, "Yifei Huang (PD)" <yi...@palantir.com>
Subject: Re: Enabling fully disaggregated shuffle on Spark

Felix - please add me to this event.

Ben - should we move this proposal to a doc and open it up for edits/comments.

On Wed, Nov 20, 2019 at 5:37 PM Felix Cheung <fe...@hotmail.com>> wrote:
Great!

Due to number of constraints I won’t be sending link directly here but please r me and I will add you.


________________________________
From: Ben Sidhom <si...@google.com.INVALID>
Sent: Wednesday, November 20, 2019 9:10:01 AM
To: John Zhuge <jz...@apache.org>>
Cc: bo yang <bo...@gmail.com>>; Amogh Margoor <am...@qubole.com>>; Ryan Blue <rb...@netflix.com>>; Ben Sidhom <si...@google.com.invalid>; Spark Dev List <de...@spark.apache.org>>; Christopher Crosbie <cr...@google.com>>; Griselda Cuevas <gr...@google.com>>; Holden Karau <ho...@pigscanfly.ca>>; Mayank Ahuja <ma...@qubole.com>>; Kalyan Sivakumar <ka...@qubole.com>>; alfozan@fb.com<ma...@fb.com> <al...@fb.com>>; Felix Cheung <fe...@uber.com>>; Matt Cheah <mc...@palantir.com>>; Yifei Huang (PD) <yi...@palantir.com>>
Subject: Re: Enabling fully disaggregated shuffle on Spark

That sounds great!

On Wed, Nov 20, 2019 at 9:02 AM John Zhuge <jz...@apache.org>> wrote:
That will be great. Please send us the invite.

On Wed, Nov 20, 2019 at 8:56 AM bo yang <bo...@gmail.com>> wrote:
Cool, thanks Ryan, John, Amogh for the reply! Great to see you interested! Felix will have a Spark Scalability & Reliability Sync meeting on Dec 4 1pm PST. We could discuss more details there. Do you want to join?

On Tue, Nov 19, 2019 at 4:23 PM Amogh Margoor <am...@qubole.com>> wrote:
We at Qubole are also looking at disaggregating shuffle on Spark. Would love to collaborate and share learnings.

Regards,
Amogh

On Tue, Nov 19, 2019 at 4:09 PM John Zhuge <jz...@apache.org>> wrote:
Great work, Bo! Would love to hear the details.


On Tue, Nov 19, 2019 at 4:05 PM Ryan Blue <rb...@netflix.com.invalid> wrote:
I'm interested in remote shuffle services as well. I'd love to hear about what you're using in production!

rb

On Tue, Nov 19, 2019 at 2:43 PM bo yang <bo...@gmail.com>> wrote:
Hi Ben,

Thanks for the writing up! This is Bo from Uber. I am in Felix's team in Seattle, and working on disaggregated shuffle (we called it remote shuffle service, RSS, internally). We have put RSS into production for a while, and learned a lot during the work (tried quite a few techniques to improve the remote shuffle performance). We could share our learning with the community, and also would like to hear feedback/suggestions on how to further improve remote shuffle performance. We could chat more details if you or other people are interested.

Best,
Bo

On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <si...@google.com.invalid> wrote:

I would like to start a conversation about extending the Spark shuffle manager surface to support fully disaggregated shuffle implementations. This is closely related to the work in SPARK-25299<https://issues.apache.org/jira/browse/SPARK-25299>, which is focused on refactoring the shuffle manager API (and in particular, SortShuffleManager) to use a pluggable storage backend. The motivation for that SPIP is further enabling Spark on Kubernetes.



The motivation for this proposal is enabling full externalized (disaggregated) shuffle service implementations. (Facebook’s Cosco shuffle<https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service> is one example of such a disaggregated shuffle service.) These changes allow the bulk of the shuffle to run in a remote service such that minimal state resides in executors and local disk spill is minimized. The net effect is increased job stability and performance improvements in certain scenarios. These changes should work well with or are complementary to SPARK-25299. Some or all points may be merged into that issue as appropriate.



Below is a description of each component of this proposal. These changes can ideally be introduced incrementally. I would like to gather feedback and gauge interest from others in the community to collaborate on this. There are likely more points that would  be useful to disaggregated shuffle services. We can outline a more concrete plan after gathering enough input. A working session could help us kick off this joint effort; maybe something in the mid-January to mid-February timeframe (depending on interest and availability. I’m happy to host at our Sunnyvale, CA offices.



Proposal
Scheduling and re-executing tasks

Allow coordination between the service and the Spark DAG scheduler as to whether a given block/partition needs to be recomputed when a task fails or when shuffle block data cannot be read. Having such coordination is important, e.g., for suppressing recomputation after aborted executors or for forcing late recomputation if the service internally acts as a cache. One catchall solution is to have the shuffle manager provide an indication of whether shuffle data is external to executors (or nodes). Another option: allow the shuffle manager (likely on the driver) to be queried for the existence of shuffle data for a given executor ID (or perhaps map task, reduce task, etc). Note that this is at the level of data the scheduler is aware of (i.e., map/reduce partitions) rather than block IDs, which are internal details for some shuffle managers.

ShuffleManager API

Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the service knows that data is still active. This is one way to enable time-/job-scoped data because a disaggregated shuffle service cannot rely on robust communication with Spark and in general has a distinct lifecycle from the Spark deployment(s) it talks to. This would likely take the form of a callback on ShuffleManager itself, but there are other approaches.



Add lifecycle hooks to shuffle readers and writers (e.g., to close/recycle connections/streams/file handles as well as provide commit semantics). SPARK-25299 adds commit semantics to the internal data storage layer, but this is applicable to all shuffle managers at a higher level and should apply equally to the ShuffleWriter.



Do not require ShuffleManagers to expose ShuffleBlockResolvers where they are not needed. Ideally, this would be an implementation detail of the shuffle manager itself. If there is substantial overlap between the SortShuffleManager and other implementations, then the storage details can be abstracted at the appropriate level. (SPARK-25299 does not currently change this.)



Do not require MapStatus to include blockmanager IDs where they are not relevant. This is captured by ShuffleBlockInfo<https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit#heading=h.imi27prnziyj> including an optional BlockManagerId in SPARK-25299. However, this change should be lifted to the MapStatus level so that it applies to all ShuffleManagers. Alternatively, use a more general data-location abstraction than BlockManagerId. This gives the shuffle manager more flexibility and the scheduler more information with respect to data residence.

Serialization

Allow serializers to be used more flexibly and efficiently. For example, have serializers support writing an arbitrary number of objects into an existing OutputStream or ByteBuffer. This enables objects to be serialized to direct buffers where doing so makes sense. More importantly, it allows arbitrary metadata/framing data to be wrapped around individual objects cheaply. Right now, that’s only possible at the stream level. (There are hacks around this, but this would enable more idiomatic use in efficient shuffle implementations.)



Have serializers indicate whether they are deterministic. This provides much of the value of a shuffle service because it means that reducers do not need to spill to disk when reading/merging/combining inputs--the data can be grouped by the service, even without the service understanding data types or byte representations. Alternative (less preferable since it would break Java serialization, for example): require all serializers to be deterministic.





--

- Ben


--
Ryan Blue
Software Engineer
Netflix


--
John Zhuge


--
John Zhuge


--
-Ben


--
"...:::Aniket:::... Quetzalco@tl"

Re: Enabling fully disaggregated shuffle on Spark

Posted by Peter Rudenko <pe...@gmail.com>.
Hi, Peter from Mellanox here.
Would be interested in this event. I've been working on accelerating
Spark shuffle using RDMA (Remote direct memory access) technologies.
Now we're in the process of releasing SparkUCX
(https://github.com/openucx/sparkucx) - Spark shuffle acceleration
based on the UCX (https://github.com/openucx/ucx) - high performance
communication library, that supports many HPC protocols (RDMA, Active
messages, tag operations) over different transports (Infiniband, Cuda,
TCP, etc.). We achieved some good performance for network intensive
shuffle apps, compared to out of box TCP.

We're open to integrate UCX to other big data components (Apache Arrow
/ Flight, HDFS, etc), that could be reused in Spark to make the whole
spark workloads more effective.

Would be glad to see your use cases on optimizing spark shuffle.

Regards,
Peter Rudenko

чт, 21 лист. 2019 о 08:12 Aniket Mokashi <an...@gmail.com> пише:
>
> Felix - please add me to this event.
>
> Ben - should we move this proposal to a doc and open it up for edits/comments.
>
> On Wed, Nov 20, 2019 at 5:37 PM Felix Cheung <fe...@hotmail.com> wrote:
>>
>> Great!
>>
>> Due to number of constraints I won’t be sending link directly here but please r me and I will add you.
>>
>>
>> ________________________________
>> From: Ben Sidhom <si...@google.com.INVALID>
>> Sent: Wednesday, November 20, 2019 9:10:01 AM
>> To: John Zhuge <jz...@apache.org>
>> Cc: bo yang <bo...@gmail.com>; Amogh Margoor <am...@qubole.com>; Ryan Blue <rb...@netflix.com>; Ben Sidhom <si...@google.com.invalid>; Spark Dev List <de...@spark.apache.org>; Christopher Crosbie <cr...@google.com>; Griselda Cuevas <gr...@google.com>; Holden Karau <ho...@pigscanfly.ca>; Mayank Ahuja <ma...@qubole.com>; Kalyan Sivakumar <ka...@qubole.com>; alfozan@fb.com <al...@fb.com>; Felix Cheung <fe...@uber.com>; Matt Cheah <mc...@palantir.com>; Yifei Huang (PD) <yi...@palantir.com>
>> Subject: Re: Enabling fully disaggregated shuffle on Spark
>>
>> That sounds great!
>>
>> On Wed, Nov 20, 2019 at 9:02 AM John Zhuge <jz...@apache.org> wrote:
>>
>> That will be great. Please send us the invite.
>>
>> On Wed, Nov 20, 2019 at 8:56 AM bo yang <bo...@gmail.com> wrote:
>>
>> Cool, thanks Ryan, John, Amogh for the reply! Great to see you interested! Felix will have a Spark Scalability & Reliability Sync meeting on Dec 4 1pm PST. We could discuss more details there. Do you want to join?
>>
>> On Tue, Nov 19, 2019 at 4:23 PM Amogh Margoor <am...@qubole.com> wrote:
>>
>> We at Qubole are also looking at disaggregating shuffle on Spark. Would love to collaborate and share learnings.
>>
>> Regards,
>> Amogh
>>
>> On Tue, Nov 19, 2019 at 4:09 PM John Zhuge <jz...@apache.org> wrote:
>>
>> Great work, Bo! Would love to hear the details.
>>
>>
>> On Tue, Nov 19, 2019 at 4:05 PM Ryan Blue <rb...@netflix.com.invalid> wrote:
>>
>> I'm interested in remote shuffle services as well. I'd love to hear about what you're using in production!
>>
>> rb
>>
>> On Tue, Nov 19, 2019 at 2:43 PM bo yang <bo...@gmail.com> wrote:
>>
>> Hi Ben,
>>
>> Thanks for the writing up! This is Bo from Uber. I am in Felix's team in Seattle, and working on disaggregated shuffle (we called it remote shuffle service, RSS, internally). We have put RSS into production for a while, and learned a lot during the work (tried quite a few techniques to improve the remote shuffle performance). We could share our learning with the community, and also would like to hear feedback/suggestions on how to further improve remote shuffle performance. We could chat more details if you or other people are interested.
>>
>> Best,
>> Bo
>>
>> On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <si...@google.com.invalid> wrote:
>>
>> I would like to start a conversation about extending the Spark shuffle manager surface to support fully disaggregated shuffle implementations. This is closely related to the work in SPARK-25299, which is focused on refactoring the shuffle manager API (and in particular, SortShuffleManager) to use a pluggable storage backend. The motivation for that SPIP is further enabling Spark on Kubernetes.
>>
>>
>> The motivation for this proposal is enabling full externalized (disaggregated) shuffle service implementations. (Facebook’s Cosco shuffle is one example of such a disaggregated shuffle service.) These changes allow the bulk of the shuffle to run in a remote service such that minimal state resides in executors and local disk spill is minimized. The net effect is increased job stability and performance improvements in certain scenarios. These changes should work well with or are complementary to SPARK-25299. Some or all points may be merged into that issue as appropriate.
>>
>>
>> Below is a description of each component of this proposal. These changes can ideally be introduced incrementally. I would like to gather feedback and gauge interest from others in the community to collaborate on this. There are likely more points that would  be useful to disaggregated shuffle services. We can outline a more concrete plan after gathering enough input. A working session could help us kick off this joint effort; maybe something in the mid-January to mid-February timeframe (depending on interest and availability. I’m happy to host at our Sunnyvale, CA offices.
>>
>>
>> Proposal
>>
>> Scheduling and re-executing tasks
>>
>> Allow coordination between the service and the Spark DAG scheduler as to whether a given block/partition needs to be recomputed when a task fails or when shuffle block data cannot be read. Having such coordination is important, e.g., for suppressing recomputation after aborted executors or for forcing late recomputation if the service internally acts as a cache. One catchall solution is to have the shuffle manager provide an indication of whether shuffle data is external to executors (or nodes). Another option: allow the shuffle manager (likely on the driver) to be queried for the existence of shuffle data for a given executor ID (or perhaps map task, reduce task, etc). Note that this is at the level of data the scheduler is aware of (i.e., map/reduce partitions) rather than block IDs, which are internal details for some shuffle managers.
>>
>> ShuffleManager API
>>
>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the service knows that data is still active. This is one way to enable time-/job-scoped data because a disaggregated shuffle service cannot rely on robust communication with Spark and in general has a distinct lifecycle from the Spark deployment(s) it talks to. This would likely take the form of a callback on ShuffleManager itself, but there are other approaches.
>>
>>
>> Add lifecycle hooks to shuffle readers and writers (e.g., to close/recycle connections/streams/file handles as well as provide commit semantics). SPARK-25299 adds commit semantics to the internal data storage layer, but this is applicable to all shuffle managers at a higher level and should apply equally to the ShuffleWriter.
>>
>>
>> Do not require ShuffleManagers to expose ShuffleBlockResolvers where they are not needed. Ideally, this would be an implementation detail of the shuffle manager itself. If there is substantial overlap between the SortShuffleManager and other implementations, then the storage details can be abstracted at the appropriate level. (SPARK-25299 does not currently change this.)
>>
>>
>> Do not require MapStatus to include blockmanager IDs where they are not relevant. This is captured by ShuffleBlockInfo including an optional BlockManagerId in SPARK-25299. However, this change should be lifted to the MapStatus level so that it applies to all ShuffleManagers. Alternatively, use a more general data-location abstraction than BlockManagerId. This gives the shuffle manager more flexibility and the scheduler more information with respect to data residence.
>>
>> Serialization
>>
>> Allow serializers to be used more flexibly and efficiently. For example, have serializers support writing an arbitrary number of objects into an existing OutputStream or ByteBuffer. This enables objects to be serialized to direct buffers where doing so makes sense. More importantly, it allows arbitrary metadata/framing data to be wrapped around individual objects cheaply. Right now, that’s only possible at the stream level. (There are hacks around this, but this would enable more idiomatic use in efficient shuffle implementations.)
>>
>>
>> Have serializers indicate whether they are deterministic. This provides much of the value of a shuffle service because it means that reducers do not need to spill to disk when reading/merging/combining inputs--the data can be grouped by the service, even without the service understanding data types or byte representations. Alternative (less preferable since it would break Java serialization, for example): require all serializers to be deterministic.
>>
>>
>>
>> --
>>
>> - Ben
>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>>
>>
>> --
>> John Zhuge
>>
>>
>>
>> --
>> John Zhuge
>>
>>
>>
>> --
>> -Ben
>
>
>
> --
> "...:::Aniket:::... Quetzalco@tl"

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: Enabling fully disaggregated shuffle on Spark

Posted by Aniket Mokashi <an...@gmail.com>.
Felix - please add me to this event.

Ben - should we move this proposal to a doc and open it up for
edits/comments.

On Wed, Nov 20, 2019 at 5:37 PM Felix Cheung <fe...@hotmail.com>
wrote:

> Great!
>
> Due to number of constraints I won’t be sending link directly here but
> please r me and I will add you.
>
>
> ------------------------------
> *From:* Ben Sidhom <si...@google.com.INVALID>
> *Sent:* Wednesday, November 20, 2019 9:10:01 AM
> *To:* John Zhuge <jz...@apache.org>
> *Cc:* bo yang <bo...@gmail.com>; Amogh Margoor <am...@qubole.com>;
> Ryan Blue <rb...@netflix.com>; Ben Sidhom <si...@google.com.invalid>;
> Spark Dev List <de...@spark.apache.org>; Christopher Crosbie <
> crosbiec@google.com>; Griselda Cuevas <gr...@google.com>; Holden Karau <
> holden@pigscanfly.ca>; Mayank Ahuja <ma...@qubole.com>; Kalyan Sivakumar
> <ka...@qubole.com>; alfozan@fb.com <al...@fb.com>; Felix Cheung <
> felixc@uber.com>; Matt Cheah <mc...@palantir.com>; Yifei Huang (PD) <
> yifeih@palantir.com>
> *Subject:* Re: Enabling fully disaggregated shuffle on Spark
>
> That sounds great!
>
> On Wed, Nov 20, 2019 at 9:02 AM John Zhuge <jz...@apache.org> wrote:
>
> That will be great. Please send us the invite.
>
> On Wed, Nov 20, 2019 at 8:56 AM bo yang <bo...@gmail.com> wrote:
>
> Cool, thanks Ryan, John, Amogh for the reply! Great to see you interested!
> Felix will have a Spark Scalability & Reliability Sync meeting on Dec 4 1pm
> PST. We could discuss more details there. Do you want to join?
>
> On Tue, Nov 19, 2019 at 4:23 PM Amogh Margoor <am...@qubole.com> wrote:
>
> We at Qubole are also looking at disaggregating shuffle on Spark. Would
> love to collaborate and share learnings.
>
> Regards,
> Amogh
>
> On Tue, Nov 19, 2019 at 4:09 PM John Zhuge <jz...@apache.org> wrote:
>
> Great work, Bo! Would love to hear the details.
>
>
> On Tue, Nov 19, 2019 at 4:05 PM Ryan Blue <rb...@netflix.com.invalid>
> wrote:
>
> I'm interested in remote shuffle services as well. I'd love to hear about
> what you're using in production!
>
> rb
>
> On Tue, Nov 19, 2019 at 2:43 PM bo yang <bo...@gmail.com> wrote:
>
> Hi Ben,
>
> Thanks for the writing up! This is Bo from Uber. I am in Felix's team in
> Seattle, and working on disaggregated shuffle (we called it remote shuffle
> service, RSS, internally). We have put RSS into production for a while, and
> learned a lot during the work (tried quite a few techniques to improve the
> remote shuffle performance). We could share our learning with the
> community, and also would like to hear feedback/suggestions on how to
> further improve remote shuffle performance. We could chat more details if
> you or other people are interested.
>
> Best,
> Bo
>
> On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <si...@google.com.invalid>
> wrote:
>
> I would like to start a conversation about extending the Spark shuffle
> manager surface to support fully disaggregated shuffle implementations.
> This is closely related to the work in SPARK-25299
> <https://issues.apache.org/jira/browse/SPARK-25299>, which is focused on
> refactoring the shuffle manager API (and in particular, SortShuffleManager)
> to use a pluggable storage backend. The motivation for that SPIP is further
> enabling Spark on Kubernetes.
>
>
> The motivation for this proposal is enabling full externalized
> (disaggregated) shuffle service implementations. (Facebook’s Cosco shuffle
> <https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service>
> is one example of such a disaggregated shuffle service.) These changes
> allow the bulk of the shuffle to run in a remote service such that minimal
> state resides in executors and local disk spill is minimized. The net
> effect is increased job stability and performance improvements in certain
> scenarios. These changes should work well with or are complementary to
> SPARK-25299. Some or all points may be merged into that issue as
> appropriate.
>
>
> Below is a description of each component of this proposal. These changes
> can ideally be introduced incrementally. I would like to gather feedback
> and gauge interest from others in the community to collaborate on this.
> There are likely more points that would  be useful to disaggregated shuffle
> services. We can outline a more concrete plan after gathering enough input.
> A working session could help us kick off this joint effort; maybe something
> in the mid-January to mid-February timeframe (depending on interest and
> availability. I’m happy to host at our Sunnyvale, CA offices.
>
>
> Proposal Scheduling and re-executing tasks
>
> Allow coordination between the service and the Spark DAG scheduler as to
> whether a given block/partition needs to be recomputed when a task fails or
> when shuffle block data cannot be read. Having such coordination is
> important, e.g., for suppressing recomputation after aborted executors or
> for forcing late recomputation if the service internally acts as a cache.
> One catchall solution is to have the shuffle manager provide an indication
> of whether shuffle data is external to executors (or nodes). Another
> option: allow the shuffle manager (likely on the driver) to be queried for
> the existence of shuffle data for a given executor ID (or perhaps map task,
> reduce task, etc). Note that this is at the level of data the scheduler is
> aware of (i.e., map/reduce partitions) rather than block IDs, which are
> internal details for some shuffle managers.
> ShuffleManager API
>
> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the
> service knows that data is still active. This is one way to enable
> time-/job-scoped data because a disaggregated shuffle service cannot rely
> on robust communication with Spark and in general has a distinct lifecycle
> from the Spark deployment(s) it talks to. This would likely take the form
> of a callback on ShuffleManager itself, but there are other approaches.
>
>
> Add lifecycle hooks to shuffle readers and writers (e.g., to close/recycle
> connections/streams/file handles as well as provide commit semantics).
> SPARK-25299 adds commit semantics to the internal data storage layer, but
> this is applicable to all shuffle managers at a higher level and should
> apply equally to the ShuffleWriter.
>
>
> Do not require ShuffleManagers to expose ShuffleBlockResolvers where they
> are not needed. Ideally, this would be an implementation detail of the
> shuffle manager itself. If there is substantial overlap between the
> SortShuffleManager and other implementations, then the storage details can
> be abstracted at the appropriate level. (SPARK-25299 does not currently
> change this.)
>
>
> Do not require MapStatus to include blockmanager IDs where they are not
> relevant. This is captured by ShuffleBlockInfo
> <https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit#heading=h.imi27prnziyj>
> including an optional BlockManagerId in SPARK-25299. However, this change
> should be lifted to the MapStatus level so that it applies to all
> ShuffleManagers. Alternatively, use a more general data-location
> abstraction than BlockManagerId. This gives the shuffle manager more
> flexibility and the scheduler more information with respect to data
> residence.
> Serialization
>
> Allow serializers to be used more flexibly and efficiently. For example,
> have serializers support writing an arbitrary number of objects into an
> existing OutputStream or ByteBuffer. This enables objects to be serialized
> to direct buffers where doing so makes sense. More importantly, it allows
> arbitrary metadata/framing data to be wrapped around individual objects
> cheaply. Right now, that’s only possible at the stream level. (There are
> hacks around this, but this would enable more idiomatic use in efficient
> shuffle implementations.)
>
>
> Have serializers indicate whether they are deterministic. This provides
> much of the value of a shuffle service because it means that reducers do
> not need to spill to disk when reading/merging/combining inputs--the data
> can be grouped by the service, even without the service understanding data
> types or byte representations. Alternative (less preferable since it would
> break Java serialization, for example): require all serializers to be
> deterministic.
>
>
>
> --
>
> - Ben
>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>
>
>
> --
> John Zhuge
>
>
>
> --
> John Zhuge
>
>
>
> --
> -Ben
>


-- 
"...:::Aniket:::... Quetzalco@tl"

RE: Enabling fully disaggregated shuffle on Spark

Posted by Prakhar Jain <Ja...@microsoft.com.INVALID>.
Great work Ben. At Microsoft, we are also working on disaggregating shuffle from Spark. Please add me to the invite.

From: Felix Cheung <fe...@hotmail.com>
Sent: 21 November 2019 07:07
To: Ben Sidhom <si...@google.com.INVALID>; John Zhuge <jz...@apache.org>
Cc: bo yang <bo...@gmail.com>; Amogh Margoor <am...@qubole.com>; Ryan Blue <rb...@netflix.com>; Ben Sidhom <si...@google.com.INVALID>; Spark Dev List <de...@spark.apache.org>; Christopher Crosbie <cr...@google.com>; Griselda Cuevas <gr...@google.com>; Holden Karau <ho...@pigscanfly.ca>; Mayank Ahuja <ma...@qubole.com>; Kalyan Sivakumar <ka...@qubole.com>; alfozan@fb.com; Felix Cheung <fe...@uber.com>; Matt Cheah <mc...@palantir.com>; Yifei Huang (PD) <yi...@palantir.com>
Subject: Re: Enabling fully disaggregated shuffle on Spark

Great!

Due to number of constraints I won't be sending link directly here but please r me and I will add you.


________________________________
From: Ben Sidhom <si...@google.com.INVALID>>
Sent: Wednesday, November 20, 2019 9:10:01 AM
To: John Zhuge <jz...@apache.org>>
Cc: bo yang <bo...@gmail.com>>; Amogh Margoor <am...@qubole.com>>; Ryan Blue <rb...@netflix.com>>; Ben Sidhom <si...@google.com.invalid>>; Spark Dev List <de...@spark.apache.org>>; Christopher Crosbie <cr...@google.com>>; Griselda Cuevas <gr...@google.com>>; Holden Karau <ho...@pigscanfly.ca>>; Mayank Ahuja <ma...@qubole.com>>; Kalyan Sivakumar <ka...@qubole.com>>; alfozan@fb.com<ma...@fb.com> <al...@fb.com>>; Felix Cheung <fe...@uber.com>>; Matt Cheah <mc...@palantir.com>>; Yifei Huang (PD) <yi...@palantir.com>>
Subject: Re: Enabling fully disaggregated shuffle on Spark

That sounds great!

On Wed, Nov 20, 2019 at 9:02 AM John Zhuge <jz...@apache.org>> wrote:
That will be great. Please send us the invite.

On Wed, Nov 20, 2019 at 8:56 AM bo yang <bo...@gmail.com>> wrote:
Cool, thanks Ryan, John, Amogh for the reply! Great to see you interested! Felix will have a Spark Scalability & Reliability Sync meeting on Dec 4 1pm PST. We could discuss more details there. Do you want to join?

On Tue, Nov 19, 2019 at 4:23 PM Amogh Margoor <am...@qubole.com>> wrote:
We at Qubole are also looking at disaggregating shuffle on Spark. Would love to collaborate and share learnings.

Regards,
Amogh

On Tue, Nov 19, 2019 at 4:09 PM John Zhuge <jz...@apache.org>> wrote:
Great work, Bo! Would love to hear the details.


On Tue, Nov 19, 2019 at 4:05 PM Ryan Blue <rb...@netflix.com.invalid>> wrote:
I'm interested in remote shuffle services as well. I'd love to hear about what you're using in production!

rb

On Tue, Nov 19, 2019 at 2:43 PM bo yang <bo...@gmail.com>> wrote:
Hi Ben,

Thanks for the writing up! This is Bo from Uber. I am in Felix's team in Seattle, and working on disaggregated shuffle (we called it remote shuffle service, RSS, internally). We have put RSS into production for a while, and learned a lot during the work (tried quite a few techniques to improve the remote shuffle performance). We could share our learning with the community, and also would like to hear feedback/suggestions on how to further improve remote shuffle performance. We could chat more details if you or other people are interested.

Best,
Bo

On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <si...@google.com.invalid>> wrote:

I would like to start a conversation about extending the Spark shuffle manager surface to support fully disaggregated shuffle implementations. This is closely related to the work in SPARK-25299<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FSPARK-25299&data=02%7C01%7CJain.Prakhar%40microsoft.com%7C39403b623d0c473858a108d76e23526d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637098970320003466&sdata=L8ntWrLnkDt7XftskQCfP8SW4wmTGZxkvD9yeaOVjY0%3D&reserved=0>, which is focused on refactoring the shuffle manager API (and in particular, SortShuffleManager) to use a pluggable storage backend. The motivation for that SPIP is further enabling Spark on Kubernetes.



The motivation for this proposal is enabling full externalized (disaggregated) shuffle service implementations. (Facebook's Cosco shuffle<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdatabricks.com%2Fsession%2Fcosco-an-efficient-facebook-scale-shuffle-service&data=02%7C01%7CJain.Prakhar%40microsoft.com%7C39403b623d0c473858a108d76e23526d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637098970320003466&sdata=lCp0Iqb7i7zWQdtVfq5FFntbQE5VAW2bH7%2FluYv7ivs%3D&reserved=0> is one example of such a disaggregated shuffle service.) These changes allow the bulk of the shuffle to run in a remote service such that minimal state resides in executors and local disk spill is minimized. The net effect is increased job stability and performance improvements in certain scenarios. These changes should work well with or are complementary to SPARK-25299. Some or all points may be merged into that issue as appropriate.



Below is a description of each component of this proposal. These changes can ideally be introduced incrementally. I would like to gather feedback and gauge interest from others in the community to collaborate on this. There are likely more points that would  be useful to disaggregated shuffle services. We can outline a more concrete plan after gathering enough input. A working session could help us kick off this joint effort; maybe something in the mid-January to mid-February timeframe (depending on interest and availability. I'm happy to host at our Sunnyvale, CA offices.



Proposal
Scheduling and re-executing tasks

Allow coordination between the service and the Spark DAG scheduler as to whether a given block/partition needs to be recomputed when a task fails or when shuffle block data cannot be read. Having such coordination is important, e.g., for suppressing recomputation after aborted executors or for forcing late recomputation if the service internally acts as a cache. One catchall solution is to have the shuffle manager provide an indication of whether shuffle data is external to executors (or nodes). Another option: allow the shuffle manager (likely on the driver) to be queried for the existence of shuffle data for a given executor ID (or perhaps map task, reduce task, etc). Note that this is at the level of data the scheduler is aware of (i.e., map/reduce partitions) rather than block IDs, which are internal details for some shuffle managers.

ShuffleManager API

Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the service knows that data is still active. This is one way to enable time-/job-scoped data because a disaggregated shuffle service cannot rely on robust communication with Spark and in general has a distinct lifecycle from the Spark deployment(s) it talks to. This would likely take the form of a callback on ShuffleManager itself, but there are other approaches.



Add lifecycle hooks to shuffle readers and writers (e.g., to close/recycle connections/streams/file handles as well as provide commit semantics). SPARK-25299 adds commit semantics to the internal data storage layer, but this is applicable to all shuffle managers at a higher level and should apply equally to the ShuffleWriter.



Do not require ShuffleManagers to expose ShuffleBlockResolvers where they are not needed. Ideally, this would be an implementation detail of the shuffle manager itself. If there is substantial overlap between the SortShuffleManager and other implementations, then the storage details can be abstracted at the appropriate level. (SPARK-25299 does not currently change this.)



Do not require MapStatus to include blockmanager IDs where they are not relevant. This is captured by ShuffleBlockInfo<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.google.com%2Fdocument%2Fd%2F1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ%2Fedit%23heading%3Dh.imi27prnziyj&data=02%7C01%7CJain.Prakhar%40microsoft.com%7C39403b623d0c473858a108d76e23526d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637098970320013462&sdata=jx0n%2BUzqrzdFlbRdjVoO5iJfzVz8k83lrmDxckpqvBk%3D&reserved=0> including an optional BlockManagerId in SPARK-25299. However, this change should be lifted to the MapStatus level so that it applies to all ShuffleManagers. Alternatively, use a more general data-location abstraction than BlockManagerId. This gives the shuffle manager more flexibility and the scheduler more information with respect to data residence.

Serialization

Allow serializers to be used more flexibly and efficiently. For example, have serializers support writing an arbitrary number of objects into an existing OutputStream or ByteBuffer. This enables objects to be serialized to direct buffers where doing so makes sense. More importantly, it allows arbitrary metadata/framing data to be wrapped around individual objects cheaply. Right now, that's only possible at the stream level. (There are hacks around this, but this would enable more idiomatic use in efficient shuffle implementations.)



Have serializers indicate whether they are deterministic. This provides much of the value of a shuffle service because it means that reducers do not need to spill to disk when reading/merging/combining inputs--the data can be grouped by the service, even without the service understanding data types or byte representations. Alternative (less preferable since it would break Java serialization, for example): require all serializers to be deterministic.





--

- Ben


--
Ryan Blue
Software Engineer
Netflix


--
John Zhuge


--
John Zhuge


--
-Ben

Re: Enabling fully disaggregated shuffle on Spark

Posted by Felix Cheung <fe...@hotmail.com>.
Great!

Due to number of constraints I won’t be sending link directly here but please r me and I will add you.


________________________________
From: Ben Sidhom <si...@google.com.INVALID>
Sent: Wednesday, November 20, 2019 9:10:01 AM
To: John Zhuge <jz...@apache.org>
Cc: bo yang <bo...@gmail.com>; Amogh Margoor <am...@qubole.com>; Ryan Blue <rb...@netflix.com>; Ben Sidhom <si...@google.com.invalid>; Spark Dev List <de...@spark.apache.org>; Christopher Crosbie <cr...@google.com>; Griselda Cuevas <gr...@google.com>; Holden Karau <ho...@pigscanfly.ca>; Mayank Ahuja <ma...@qubole.com>; Kalyan Sivakumar <ka...@qubole.com>; alfozan@fb.com <al...@fb.com>; Felix Cheung <fe...@uber.com>; Matt Cheah <mc...@palantir.com>; Yifei Huang (PD) <yi...@palantir.com>
Subject: Re: Enabling fully disaggregated shuffle on Spark

That sounds great!

On Wed, Nov 20, 2019 at 9:02 AM John Zhuge <jz...@apache.org>> wrote:
That will be great. Please send us the invite.

On Wed, Nov 20, 2019 at 8:56 AM bo yang <bo...@gmail.com>> wrote:
Cool, thanks Ryan, John, Amogh for the reply! Great to see you interested! Felix will have a Spark Scalability & Reliability Sync meeting on Dec 4 1pm PST. We could discuss more details there. Do you want to join?

On Tue, Nov 19, 2019 at 4:23 PM Amogh Margoor <am...@qubole.com>> wrote:
We at Qubole are also looking at disaggregating shuffle on Spark. Would love to collaborate and share learnings.

Regards,
Amogh

On Tue, Nov 19, 2019 at 4:09 PM John Zhuge <jz...@apache.org>> wrote:
Great work, Bo! Would love to hear the details.


On Tue, Nov 19, 2019 at 4:05 PM Ryan Blue <rb...@netflix.com.invalid> wrote:
I'm interested in remote shuffle services as well. I'd love to hear about what you're using in production!

rb

On Tue, Nov 19, 2019 at 2:43 PM bo yang <bo...@gmail.com>> wrote:
Hi Ben,

Thanks for the writing up! This is Bo from Uber. I am in Felix's team in Seattle, and working on disaggregated shuffle (we called it remote shuffle service, RSS, internally). We have put RSS into production for a while, and learned a lot during the work (tried quite a few techniques to improve the remote shuffle performance). We could share our learning with the community, and also would like to hear feedback/suggestions on how to further improve remote shuffle performance. We could chat more details if you or other people are interested.

Best,
Bo

On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <si...@google.com.invalid> wrote:

I would like to start a conversation about extending the Spark shuffle manager surface to support fully disaggregated shuffle implementations. This is closely related to the work in SPARK-25299<https://issues.apache.org/jira/browse/SPARK-25299>, which is focused on refactoring the shuffle manager API (and in particular, SortShuffleManager) to use a pluggable storage backend. The motivation for that SPIP is further enabling Spark on Kubernetes.


The motivation for this proposal is enabling full externalized (disaggregated) shuffle service implementations. (Facebook’s Cosco shuffle<https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service> is one example of such a disaggregated shuffle service.) These changes allow the bulk of the shuffle to run in a remote service such that minimal state resides in executors and local disk spill is minimized. The net effect is increased job stability and performance improvements in certain scenarios. These changes should work well with or are complementary to SPARK-25299. Some or all points may be merged into that issue as appropriate.


Below is a description of each component of this proposal. These changes can ideally be introduced incrementally. I would like to gather feedback and gauge interest from others in the community to collaborate on this. There are likely more points that would  be useful to disaggregated shuffle services. We can outline a more concrete plan after gathering enough input. A working session could help us kick off this joint effort; maybe something in the mid-January to mid-February timeframe (depending on interest and availability. I’m happy to host at our Sunnyvale, CA offices.


Proposal
Scheduling and re-executing tasks

Allow coordination between the service and the Spark DAG scheduler as to whether a given block/partition needs to be recomputed when a task fails or when shuffle block data cannot be read. Having such coordination is important, e.g., for suppressing recomputation after aborted executors or for forcing late recomputation if the service internally acts as a cache. One catchall solution is to have the shuffle manager provide an indication of whether shuffle data is external to executors (or nodes). Another option: allow the shuffle manager (likely on the driver) to be queried for the existence of shuffle data for a given executor ID (or perhaps map task, reduce task, etc). Note that this is at the level of data the scheduler is aware of (i.e., map/reduce partitions) rather than block IDs, which are internal details for some shuffle managers.

ShuffleManager API

Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the service knows that data is still active. This is one way to enable time-/job-scoped data because a disaggregated shuffle service cannot rely on robust communication with Spark and in general has a distinct lifecycle from the Spark deployment(s) it talks to. This would likely take the form of a callback on ShuffleManager itself, but there are other approaches.


Add lifecycle hooks to shuffle readers and writers (e.g., to close/recycle connections/streams/file handles as well as provide commit semantics). SPARK-25299 adds commit semantics to the internal data storage layer, but this is applicable to all shuffle managers at a higher level and should apply equally to the ShuffleWriter.


Do not require ShuffleManagers to expose ShuffleBlockResolvers where they are not needed. Ideally, this would be an implementation detail of the shuffle manager itself. If there is substantial overlap between the SortShuffleManager and other implementations, then the storage details can be abstracted at the appropriate level. (SPARK-25299 does not currently change this.)


Do not require MapStatus to include blockmanager IDs where they are not relevant. This is captured by ShuffleBlockInfo<https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit#heading=h.imi27prnziyj> including an optional BlockManagerId in SPARK-25299. However, this change should be lifted to the MapStatus level so that it applies to all ShuffleManagers. Alternatively, use a more general data-location abstraction than BlockManagerId. This gives the shuffle manager more flexibility and the scheduler more information with respect to data residence.

Serialization

Allow serializers to be used more flexibly and efficiently. For example, have serializers support writing an arbitrary number of objects into an existing OutputStream or ByteBuffer. This enables objects to be serialized to direct buffers where doing so makes sense. More importantly, it allows arbitrary metadata/framing data to be wrapped around individual objects cheaply. Right now, that’s only possible at the stream level. (There are hacks around this, but this would enable more idiomatic use in efficient shuffle implementations.)


Have serializers indicate whether they are deterministic. This provides much of the value of a shuffle service because it means that reducers do not need to spill to disk when reading/merging/combining inputs--the data can be grouped by the service, even without the service understanding data types or byte representations. Alternative (less preferable since it would break Java serialization, for example): require all serializers to be deterministic.



--

- Ben


--
Ryan Blue
Software Engineer
Netflix


--
John Zhuge


--
John Zhuge


--
-Ben

Re: Enabling fully disaggregated shuffle on Spark

Posted by Ben Sidhom <si...@google.com.INVALID>.
That sounds great!

On Wed, Nov 20, 2019 at 9:02 AM John Zhuge <jz...@apache.org> wrote:

> That will be great. Please send us the invite.
>
> On Wed, Nov 20, 2019 at 8:56 AM bo yang <bo...@gmail.com> wrote:
>
>> Cool, thanks Ryan, John, Amogh for the reply! Great to see you
>> interested! Felix will have a Spark Scalability & Reliability Sync
>> meeting on Dec 4 1pm PST. We could discuss more details there. Do you want
>> to join?
>>
>> On Tue, Nov 19, 2019 at 4:23 PM Amogh Margoor <am...@qubole.com> wrote:
>>
>>> We at Qubole are also looking at disaggregating shuffle on Spark. Would
>>> love to collaborate and share learnings.
>>>
>>> Regards,
>>> Amogh
>>>
>>> On Tue, Nov 19, 2019 at 4:09 PM John Zhuge <jz...@apache.org> wrote:
>>>
>>>> Great work, Bo! Would love to hear the details.
>>>>
>>>>
>>>> On Tue, Nov 19, 2019 at 4:05 PM Ryan Blue <rb...@netflix.com.invalid>
>>>> wrote:
>>>>
>>>>> I'm interested in remote shuffle services as well. I'd love to hear
>>>>> about what you're using in production!
>>>>>
>>>>> rb
>>>>>
>>>>> On Tue, Nov 19, 2019 at 2:43 PM bo yang <bo...@gmail.com> wrote:
>>>>>
>>>>>> Hi Ben,
>>>>>>
>>>>>> Thanks for the writing up! This is Bo from Uber. I am in Felix's team
>>>>>> in Seattle, and working on disaggregated shuffle (we called it remote
>>>>>> shuffle service, RSS, internally). We have put RSS into production for a
>>>>>> while, and learned a lot during the work (tried quite a few techniques to
>>>>>> improve the remote shuffle performance). We could share our learning with
>>>>>> the community, and also would like to hear feedback/suggestions on how to
>>>>>> further improve remote shuffle performance. We could chat more details if
>>>>>> you or other people are interested.
>>>>>>
>>>>>> Best,
>>>>>> Bo
>>>>>>
>>>>>> On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <si...@google.com.invalid>
>>>>>> wrote:
>>>>>>
>>>>>>> I would like to start a conversation about extending the Spark
>>>>>>> shuffle manager surface to support fully disaggregated shuffle
>>>>>>> implementations. This is closely related to the work in SPARK-25299
>>>>>>> <https://issues.apache.org/jira/browse/SPARK-25299>, which is
>>>>>>> focused on refactoring the shuffle manager API (and in particular,
>>>>>>> SortShuffleManager) to use a pluggable storage backend. The motivation for
>>>>>>> that SPIP is further enabling Spark on Kubernetes.
>>>>>>>
>>>>>>>
>>>>>>> The motivation for this proposal is enabling full externalized
>>>>>>> (disaggregated) shuffle service implementations. (Facebook’s Cosco
>>>>>>> shuffle
>>>>>>> <https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service>
>>>>>>> is one example of such a disaggregated shuffle service.) These changes
>>>>>>> allow the bulk of the shuffle to run in a remote service such that minimal
>>>>>>> state resides in executors and local disk spill is minimized. The net
>>>>>>> effect is increased job stability and performance improvements in certain
>>>>>>> scenarios. These changes should work well with or are complementary to
>>>>>>> SPARK-25299. Some or all points may be merged into that issue as
>>>>>>> appropriate.
>>>>>>>
>>>>>>>
>>>>>>> Below is a description of each component of this proposal. These
>>>>>>> changes can ideally be introduced incrementally. I would like to gather
>>>>>>> feedback and gauge interest from others in the community to collaborate on
>>>>>>> this. There are likely more points that would  be useful to disaggregated
>>>>>>> shuffle services. We can outline a more concrete plan after gathering
>>>>>>> enough input. A working session could help us kick off this joint effort;
>>>>>>> maybe something in the mid-January to mid-February timeframe (depending on
>>>>>>> interest and availability. I’m happy to host at our Sunnyvale, CA offices.
>>>>>>>
>>>>>>>
>>>>>>> ProposalScheduling and re-executing tasks
>>>>>>>
>>>>>>> Allow coordination between the service and the Spark DAG scheduler
>>>>>>> as to whether a given block/partition needs to be recomputed when a task
>>>>>>> fails or when shuffle block data cannot be read. Having such coordination
>>>>>>> is important, e.g., for suppressing recomputation after aborted executors
>>>>>>> or for forcing late recomputation if the service internally acts as a
>>>>>>> cache. One catchall solution is to have the shuffle manager provide an
>>>>>>> indication of whether shuffle data is external to executors (or nodes).
>>>>>>> Another option: allow the shuffle manager (likely on the driver) to be
>>>>>>> queried for the existence of shuffle data for a given executor ID (or
>>>>>>> perhaps map task, reduce task, etc). Note that this is at the level of data
>>>>>>> the scheduler is aware of (i.e., map/reduce partitions) rather than block
>>>>>>> IDs, which are internal details for some shuffle managers.
>>>>>>> ShuffleManager API
>>>>>>>
>>>>>>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that
>>>>>>> the service knows that data is still active. This is one way to enable
>>>>>>> time-/job-scoped data because a disaggregated shuffle service cannot rely
>>>>>>> on robust communication with Spark and in general has a distinct lifecycle
>>>>>>> from the Spark deployment(s) it talks to. This would likely take the form
>>>>>>> of a callback on ShuffleManager itself, but there are other approaches.
>>>>>>>
>>>>>>>
>>>>>>> Add lifecycle hooks to shuffle readers and writers (e.g., to
>>>>>>> close/recycle connections/streams/file handles as well as provide commit
>>>>>>> semantics). SPARK-25299 adds commit semantics to the internal data storage
>>>>>>> layer, but this is applicable to all shuffle managers at a higher level and
>>>>>>> should apply equally to the ShuffleWriter.
>>>>>>>
>>>>>>>
>>>>>>> Do not require ShuffleManagers to expose ShuffleBlockResolvers where
>>>>>>> they are not needed. Ideally, this would be an implementation detail of the
>>>>>>> shuffle manager itself. If there is substantial overlap between the
>>>>>>> SortShuffleManager and other implementations, then the storage details can
>>>>>>> be abstracted at the appropriate level. (SPARK-25299 does not currently
>>>>>>> change this.)
>>>>>>>
>>>>>>>
>>>>>>> Do not require MapStatus to include blockmanager IDs where they are
>>>>>>> not relevant. This is captured by ShuffleBlockInfo
>>>>>>> <https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit#heading=h.imi27prnziyj>
>>>>>>> including an optional BlockManagerId in SPARK-25299. However, this
>>>>>>> change should be lifted to the MapStatus level so that it applies to all
>>>>>>> ShuffleManagers. Alternatively, use a more general data-location
>>>>>>> abstraction than BlockManagerId. This gives the shuffle manager more
>>>>>>> flexibility and the scheduler more information with respect to data
>>>>>>> residence.
>>>>>>> Serialization
>>>>>>>
>>>>>>> Allow serializers to be used more flexibly and efficiently. For
>>>>>>> example, have serializers support writing an arbitrary number of objects
>>>>>>> into an existing OutputStream or ByteBuffer. This enables objects to be
>>>>>>> serialized to direct buffers where doing so makes sense. More importantly,
>>>>>>> it allows arbitrary metadata/framing data to be wrapped around individual
>>>>>>> objects cheaply. Right now, that’s only possible at the stream level.
>>>>>>> (There are hacks around this, but this would enable more idiomatic use in
>>>>>>> efficient shuffle implementations.)
>>>>>>>
>>>>>>>
>>>>>>> Have serializers indicate whether they are deterministic. This
>>>>>>> provides much of the value of a shuffle service because it means that
>>>>>>> reducers do not need to spill to disk when reading/merging/combining
>>>>>>> inputs--the data can be grouped by the service, even without the service
>>>>>>> understanding data types or byte representations. Alternative (less
>>>>>>> preferable since it would break Java serialization, for example): require
>>>>>>> all serializers to be deterministic.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> - Ben
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>>>>
>>>>
>>>>
>>>> --
>>>> John Zhuge
>>>>
>>>
>
> --
> John Zhuge
>


-- 
-Ben

Re: Enabling fully disaggregated shuffle on Spark

Posted by John Zhuge <jz...@apache.org>.
That will be great. Please send us the invite.

On Wed, Nov 20, 2019 at 8:56 AM bo yang <bo...@gmail.com> wrote:

> Cool, thanks Ryan, John, Amogh for the reply! Great to see you interested!
> Felix will have a Spark Scalability & Reliability Sync meeting on Dec 4 1pm
> PST. We could discuss more details there. Do you want to join?
>
> On Tue, Nov 19, 2019 at 4:23 PM Amogh Margoor <am...@qubole.com> wrote:
>
>> We at Qubole are also looking at disaggregating shuffle on Spark. Would
>> love to collaborate and share learnings.
>>
>> Regards,
>> Amogh
>>
>> On Tue, Nov 19, 2019 at 4:09 PM John Zhuge <jz...@apache.org> wrote:
>>
>>> Great work, Bo! Would love to hear the details.
>>>
>>>
>>> On Tue, Nov 19, 2019 at 4:05 PM Ryan Blue <rb...@netflix.com.invalid>
>>> wrote:
>>>
>>>> I'm interested in remote shuffle services as well. I'd love to hear
>>>> about what you're using in production!
>>>>
>>>> rb
>>>>
>>>> On Tue, Nov 19, 2019 at 2:43 PM bo yang <bo...@gmail.com> wrote:
>>>>
>>>>> Hi Ben,
>>>>>
>>>>> Thanks for the writing up! This is Bo from Uber. I am in Felix's team
>>>>> in Seattle, and working on disaggregated shuffle (we called it remote
>>>>> shuffle service, RSS, internally). We have put RSS into production for a
>>>>> while, and learned a lot during the work (tried quite a few techniques to
>>>>> improve the remote shuffle performance). We could share our learning with
>>>>> the community, and also would like to hear feedback/suggestions on how to
>>>>> further improve remote shuffle performance. We could chat more details if
>>>>> you or other people are interested.
>>>>>
>>>>> Best,
>>>>> Bo
>>>>>
>>>>> On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <si...@google.com.invalid>
>>>>> wrote:
>>>>>
>>>>>> I would like to start a conversation about extending the Spark
>>>>>> shuffle manager surface to support fully disaggregated shuffle
>>>>>> implementations. This is closely related to the work in SPARK-25299
>>>>>> <https://issues.apache.org/jira/browse/SPARK-25299>, which is
>>>>>> focused on refactoring the shuffle manager API (and in particular,
>>>>>> SortShuffleManager) to use a pluggable storage backend. The motivation for
>>>>>> that SPIP is further enabling Spark on Kubernetes.
>>>>>>
>>>>>>
>>>>>> The motivation for this proposal is enabling full externalized
>>>>>> (disaggregated) shuffle service implementations. (Facebook’s Cosco
>>>>>> shuffle
>>>>>> <https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service>
>>>>>> is one example of such a disaggregated shuffle service.) These changes
>>>>>> allow the bulk of the shuffle to run in a remote service such that minimal
>>>>>> state resides in executors and local disk spill is minimized. The net
>>>>>> effect is increased job stability and performance improvements in certain
>>>>>> scenarios. These changes should work well with or are complementary to
>>>>>> SPARK-25299. Some or all points may be merged into that issue as
>>>>>> appropriate.
>>>>>>
>>>>>>
>>>>>> Below is a description of each component of this proposal. These
>>>>>> changes can ideally be introduced incrementally. I would like to gather
>>>>>> feedback and gauge interest from others in the community to collaborate on
>>>>>> this. There are likely more points that would  be useful to disaggregated
>>>>>> shuffle services. We can outline a more concrete plan after gathering
>>>>>> enough input. A working session could help us kick off this joint effort;
>>>>>> maybe something in the mid-January to mid-February timeframe (depending on
>>>>>> interest and availability. I’m happy to host at our Sunnyvale, CA offices.
>>>>>>
>>>>>>
>>>>>> ProposalScheduling and re-executing tasks
>>>>>>
>>>>>> Allow coordination between the service and the Spark DAG scheduler as
>>>>>> to whether a given block/partition needs to be recomputed when a task fails
>>>>>> or when shuffle block data cannot be read. Having such coordination is
>>>>>> important, e.g., for suppressing recomputation after aborted executors or
>>>>>> for forcing late recomputation if the service internally acts as a cache.
>>>>>> One catchall solution is to have the shuffle manager provide an indication
>>>>>> of whether shuffle data is external to executors (or nodes). Another
>>>>>> option: allow the shuffle manager (likely on the driver) to be queried for
>>>>>> the existence of shuffle data for a given executor ID (or perhaps map task,
>>>>>> reduce task, etc). Note that this is at the level of data the scheduler is
>>>>>> aware of (i.e., map/reduce partitions) rather than block IDs, which are
>>>>>> internal details for some shuffle managers.
>>>>>> ShuffleManager API
>>>>>>
>>>>>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that
>>>>>> the service knows that data is still active. This is one way to enable
>>>>>> time-/job-scoped data because a disaggregated shuffle service cannot rely
>>>>>> on robust communication with Spark and in general has a distinct lifecycle
>>>>>> from the Spark deployment(s) it talks to. This would likely take the form
>>>>>> of a callback on ShuffleManager itself, but there are other approaches.
>>>>>>
>>>>>>
>>>>>> Add lifecycle hooks to shuffle readers and writers (e.g., to
>>>>>> close/recycle connections/streams/file handles as well as provide commit
>>>>>> semantics). SPARK-25299 adds commit semantics to the internal data storage
>>>>>> layer, but this is applicable to all shuffle managers at a higher level and
>>>>>> should apply equally to the ShuffleWriter.
>>>>>>
>>>>>>
>>>>>> Do not require ShuffleManagers to expose ShuffleBlockResolvers where
>>>>>> they are not needed. Ideally, this would be an implementation detail of the
>>>>>> shuffle manager itself. If there is substantial overlap between the
>>>>>> SortShuffleManager and other implementations, then the storage details can
>>>>>> be abstracted at the appropriate level. (SPARK-25299 does not currently
>>>>>> change this.)
>>>>>>
>>>>>>
>>>>>> Do not require MapStatus to include blockmanager IDs where they are
>>>>>> not relevant. This is captured by ShuffleBlockInfo
>>>>>> <https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit#heading=h.imi27prnziyj>
>>>>>> including an optional BlockManagerId in SPARK-25299. However, this
>>>>>> change should be lifted to the MapStatus level so that it applies to all
>>>>>> ShuffleManagers. Alternatively, use a more general data-location
>>>>>> abstraction than BlockManagerId. This gives the shuffle manager more
>>>>>> flexibility and the scheduler more information with respect to data
>>>>>> residence.
>>>>>> Serialization
>>>>>>
>>>>>> Allow serializers to be used more flexibly and efficiently. For
>>>>>> example, have serializers support writing an arbitrary number of objects
>>>>>> into an existing OutputStream or ByteBuffer. This enables objects to be
>>>>>> serialized to direct buffers where doing so makes sense. More importantly,
>>>>>> it allows arbitrary metadata/framing data to be wrapped around individual
>>>>>> objects cheaply. Right now, that’s only possible at the stream level.
>>>>>> (There are hacks around this, but this would enable more idiomatic use in
>>>>>> efficient shuffle implementations.)
>>>>>>
>>>>>>
>>>>>> Have serializers indicate whether they are deterministic. This
>>>>>> provides much of the value of a shuffle service because it means that
>>>>>> reducers do not need to spill to disk when reading/merging/combining
>>>>>> inputs--the data can be grouped by the service, even without the service
>>>>>> understanding data types or byte representations. Alternative (less
>>>>>> preferable since it would break Java serialization, for example): require
>>>>>> all serializers to be deterministic.
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> - Ben
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>>>
>>>
>>>
>>> --
>>> John Zhuge
>>>
>>

-- 
John Zhuge

Re: Enabling fully disaggregated shuffle on Spark

Posted by bo yang <bo...@gmail.com>.
Cool, thanks Ryan, John, Amogh for the reply! Great to see you interested!
Felix will have a Spark Scalability & Reliability Sync meeting on Dec 4 1pm
PST. We could discuss more details there. Do you want to join?

On Tue, Nov 19, 2019 at 4:23 PM Amogh Margoor <am...@qubole.com> wrote:

> We at Qubole are also looking at disaggregating shuffle on Spark. Would
> love to collaborate and share learnings.
>
> Regards,
> Amogh
>
> On Tue, Nov 19, 2019 at 4:09 PM John Zhuge <jz...@apache.org> wrote:
>
>> Great work, Bo! Would love to hear the details.
>>
>>
>> On Tue, Nov 19, 2019 at 4:05 PM Ryan Blue <rb...@netflix.com.invalid>
>> wrote:
>>
>>> I'm interested in remote shuffle services as well. I'd love to hear
>>> about what you're using in production!
>>>
>>> rb
>>>
>>> On Tue, Nov 19, 2019 at 2:43 PM bo yang <bo...@gmail.com> wrote:
>>>
>>>> Hi Ben,
>>>>
>>>> Thanks for the writing up! This is Bo from Uber. I am in Felix's team
>>>> in Seattle, and working on disaggregated shuffle (we called it remote
>>>> shuffle service, RSS, internally). We have put RSS into production for a
>>>> while, and learned a lot during the work (tried quite a few techniques to
>>>> improve the remote shuffle performance). We could share our learning with
>>>> the community, and also would like to hear feedback/suggestions on how to
>>>> further improve remote shuffle performance. We could chat more details if
>>>> you or other people are interested.
>>>>
>>>> Best,
>>>> Bo
>>>>
>>>> On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <si...@google.com.invalid>
>>>> wrote:
>>>>
>>>>> I would like to start a conversation about extending the Spark shuffle
>>>>> manager surface to support fully disaggregated shuffle implementations.
>>>>> This is closely related to the work in SPARK-25299
>>>>> <https://issues.apache.org/jira/browse/SPARK-25299>, which is focused
>>>>> on refactoring the shuffle manager API (and in particular,
>>>>> SortShuffleManager) to use a pluggable storage backend. The motivation for
>>>>> that SPIP is further enabling Spark on Kubernetes.
>>>>>
>>>>>
>>>>> The motivation for this proposal is enabling full externalized
>>>>> (disaggregated) shuffle service implementations. (Facebook’s Cosco
>>>>> shuffle
>>>>> <https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service>
>>>>> is one example of such a disaggregated shuffle service.) These changes
>>>>> allow the bulk of the shuffle to run in a remote service such that minimal
>>>>> state resides in executors and local disk spill is minimized. The net
>>>>> effect is increased job stability and performance improvements in certain
>>>>> scenarios. These changes should work well with or are complementary to
>>>>> SPARK-25299. Some or all points may be merged into that issue as
>>>>> appropriate.
>>>>>
>>>>>
>>>>> Below is a description of each component of this proposal. These
>>>>> changes can ideally be introduced incrementally. I would like to gather
>>>>> feedback and gauge interest from others in the community to collaborate on
>>>>> this. There are likely more points that would  be useful to disaggregated
>>>>> shuffle services. We can outline a more concrete plan after gathering
>>>>> enough input. A working session could help us kick off this joint effort;
>>>>> maybe something in the mid-January to mid-February timeframe (depending on
>>>>> interest and availability. I’m happy to host at our Sunnyvale, CA offices.
>>>>>
>>>>>
>>>>> ProposalScheduling and re-executing tasks
>>>>>
>>>>> Allow coordination between the service and the Spark DAG scheduler as
>>>>> to whether a given block/partition needs to be recomputed when a task fails
>>>>> or when shuffle block data cannot be read. Having such coordination is
>>>>> important, e.g., for suppressing recomputation after aborted executors or
>>>>> for forcing late recomputation if the service internally acts as a cache.
>>>>> One catchall solution is to have the shuffle manager provide an indication
>>>>> of whether shuffle data is external to executors (or nodes). Another
>>>>> option: allow the shuffle manager (likely on the driver) to be queried for
>>>>> the existence of shuffle data for a given executor ID (or perhaps map task,
>>>>> reduce task, etc). Note that this is at the level of data the scheduler is
>>>>> aware of (i.e., map/reduce partitions) rather than block IDs, which are
>>>>> internal details for some shuffle managers.
>>>>> ShuffleManager API
>>>>>
>>>>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that
>>>>> the service knows that data is still active. This is one way to enable
>>>>> time-/job-scoped data because a disaggregated shuffle service cannot rely
>>>>> on robust communication with Spark and in general has a distinct lifecycle
>>>>> from the Spark deployment(s) it talks to. This would likely take the form
>>>>> of a callback on ShuffleManager itself, but there are other approaches.
>>>>>
>>>>>
>>>>> Add lifecycle hooks to shuffle readers and writers (e.g., to
>>>>> close/recycle connections/streams/file handles as well as provide commit
>>>>> semantics). SPARK-25299 adds commit semantics to the internal data storage
>>>>> layer, but this is applicable to all shuffle managers at a higher level and
>>>>> should apply equally to the ShuffleWriter.
>>>>>
>>>>>
>>>>> Do not require ShuffleManagers to expose ShuffleBlockResolvers where
>>>>> they are not needed. Ideally, this would be an implementation detail of the
>>>>> shuffle manager itself. If there is substantial overlap between the
>>>>> SortShuffleManager and other implementations, then the storage details can
>>>>> be abstracted at the appropriate level. (SPARK-25299 does not currently
>>>>> change this.)
>>>>>
>>>>>
>>>>> Do not require MapStatus to include blockmanager IDs where they are
>>>>> not relevant. This is captured by ShuffleBlockInfo
>>>>> <https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit#heading=h.imi27prnziyj>
>>>>> including an optional BlockManagerId in SPARK-25299. However, this
>>>>> change should be lifted to the MapStatus level so that it applies to all
>>>>> ShuffleManagers. Alternatively, use a more general data-location
>>>>> abstraction than BlockManagerId. This gives the shuffle manager more
>>>>> flexibility and the scheduler more information with respect to data
>>>>> residence.
>>>>> Serialization
>>>>>
>>>>> Allow serializers to be used more flexibly and efficiently. For
>>>>> example, have serializers support writing an arbitrary number of objects
>>>>> into an existing OutputStream or ByteBuffer. This enables objects to be
>>>>> serialized to direct buffers where doing so makes sense. More importantly,
>>>>> it allows arbitrary metadata/framing data to be wrapped around individual
>>>>> objects cheaply. Right now, that’s only possible at the stream level.
>>>>> (There are hacks around this, but this would enable more idiomatic use in
>>>>> efficient shuffle implementations.)
>>>>>
>>>>>
>>>>> Have serializers indicate whether they are deterministic. This
>>>>> provides much of the value of a shuffle service because it means that
>>>>> reducers do not need to spill to disk when reading/merging/combining
>>>>> inputs--the data can be grouped by the service, even without the service
>>>>> understanding data types or byte representations. Alternative (less
>>>>> preferable since it would break Java serialization, for example): require
>>>>> all serializers to be deterministic.
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> - Ben
>>>>>
>>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>>
>> --
>> John Zhuge
>>
>

Re: Enabling fully disaggregated shuffle on Spark

Posted by John Zhuge <jz...@apache.org>.
Great work, Bo! Would love to hear the details.


On Tue, Nov 19, 2019 at 4:05 PM Ryan Blue <rb...@netflix.com.invalid> wrote:

> I'm interested in remote shuffle services as well. I'd love to hear about
> what you're using in production!
>
> rb
>
> On Tue, Nov 19, 2019 at 2:43 PM bo yang <bo...@gmail.com> wrote:
>
>> Hi Ben,
>>
>> Thanks for the writing up! This is Bo from Uber. I am in Felix's team in
>> Seattle, and working on disaggregated shuffle (we called it remote shuffle
>> service, RSS, internally). We have put RSS into production for a while, and
>> learned a lot during the work (tried quite a few techniques to improve the
>> remote shuffle performance). We could share our learning with the
>> community, and also would like to hear feedback/suggestions on how to
>> further improve remote shuffle performance. We could chat more details if
>> you or other people are interested.
>>
>> Best,
>> Bo
>>
>> On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <si...@google.com.invalid>
>> wrote:
>>
>>> I would like to start a conversation about extending the Spark shuffle
>>> manager surface to support fully disaggregated shuffle implementations.
>>> This is closely related to the work in SPARK-25299
>>> <https://issues.apache.org/jira/browse/SPARK-25299>, which is focused
>>> on refactoring the shuffle manager API (and in particular,
>>> SortShuffleManager) to use a pluggable storage backend. The motivation for
>>> that SPIP is further enabling Spark on Kubernetes.
>>>
>>>
>>> The motivation for this proposal is enabling full externalized
>>> (disaggregated) shuffle service implementations. (Facebook’s Cosco
>>> shuffle
>>> <https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service>
>>> is one example of such a disaggregated shuffle service.) These changes
>>> allow the bulk of the shuffle to run in a remote service such that minimal
>>> state resides in executors and local disk spill is minimized. The net
>>> effect is increased job stability and performance improvements in certain
>>> scenarios. These changes should work well with or are complementary to
>>> SPARK-25299. Some or all points may be merged into that issue as
>>> appropriate.
>>>
>>>
>>> Below is a description of each component of this proposal. These changes
>>> can ideally be introduced incrementally. I would like to gather feedback
>>> and gauge interest from others in the community to collaborate on this.
>>> There are likely more points that would  be useful to disaggregated shuffle
>>> services. We can outline a more concrete plan after gathering enough input.
>>> A working session could help us kick off this joint effort; maybe something
>>> in the mid-January to mid-February timeframe (depending on interest and
>>> availability. I’m happy to host at our Sunnyvale, CA offices.
>>>
>>>
>>> ProposalScheduling and re-executing tasks
>>>
>>> Allow coordination between the service and the Spark DAG scheduler as to
>>> whether a given block/partition needs to be recomputed when a task fails or
>>> when shuffle block data cannot be read. Having such coordination is
>>> important, e.g., for suppressing recomputation after aborted executors or
>>> for forcing late recomputation if the service internally acts as a cache.
>>> One catchall solution is to have the shuffle manager provide an indication
>>> of whether shuffle data is external to executors (or nodes). Another
>>> option: allow the shuffle manager (likely on the driver) to be queried for
>>> the existence of shuffle data for a given executor ID (or perhaps map task,
>>> reduce task, etc). Note that this is at the level of data the scheduler is
>>> aware of (i.e., map/reduce partitions) rather than block IDs, which are
>>> internal details for some shuffle managers.
>>> ShuffleManager API
>>>
>>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the
>>> service knows that data is still active. This is one way to enable
>>> time-/job-scoped data because a disaggregated shuffle service cannot rely
>>> on robust communication with Spark and in general has a distinct lifecycle
>>> from the Spark deployment(s) it talks to. This would likely take the form
>>> of a callback on ShuffleManager itself, but there are other approaches.
>>>
>>>
>>> Add lifecycle hooks to shuffle readers and writers (e.g., to
>>> close/recycle connections/streams/file handles as well as provide commit
>>> semantics). SPARK-25299 adds commit semantics to the internal data storage
>>> layer, but this is applicable to all shuffle managers at a higher level and
>>> should apply equally to the ShuffleWriter.
>>>
>>>
>>> Do not require ShuffleManagers to expose ShuffleBlockResolvers where
>>> they are not needed. Ideally, this would be an implementation detail of the
>>> shuffle manager itself. If there is substantial overlap between the
>>> SortShuffleManager and other implementations, then the storage details can
>>> be abstracted at the appropriate level. (SPARK-25299 does not currently
>>> change this.)
>>>
>>>
>>> Do not require MapStatus to include blockmanager IDs where they are not
>>> relevant. This is captured by ShuffleBlockInfo
>>> <https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit#heading=h.imi27prnziyj>
>>> including an optional BlockManagerId in SPARK-25299. However, this
>>> change should be lifted to the MapStatus level so that it applies to all
>>> ShuffleManagers. Alternatively, use a more general data-location
>>> abstraction than BlockManagerId. This gives the shuffle manager more
>>> flexibility and the scheduler more information with respect to data
>>> residence.
>>> Serialization
>>>
>>> Allow serializers to be used more flexibly and efficiently. For example,
>>> have serializers support writing an arbitrary number of objects into an
>>> existing OutputStream or ByteBuffer. This enables objects to be serialized
>>> to direct buffers where doing so makes sense. More importantly, it allows
>>> arbitrary metadata/framing data to be wrapped around individual objects
>>> cheaply. Right now, that’s only possible at the stream level. (There are
>>> hacks around this, but this would enable more idiomatic use in efficient
>>> shuffle implementations.)
>>>
>>>
>>> Have serializers indicate whether they are deterministic. This provides
>>> much of the value of a shuffle service because it means that reducers do
>>> not need to spill to disk when reading/merging/combining inputs--the data
>>> can be grouped by the service, even without the service understanding data
>>> types or byte representations. Alternative (less preferable since it would
>>> break Java serialization, for example): require all serializers to be
>>> deterministic.
>>>
>>>
>>>
>>> --
>>>
>>> - Ben
>>>
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


-- 
John Zhuge

Re: Enabling fully disaggregated shuffle on Spark

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
I'm interested in remote shuffle services as well. I'd love to hear about
what you're using in production!

rb

On Tue, Nov 19, 2019 at 2:43 PM bo yang <bo...@gmail.com> wrote:

> Hi Ben,
>
> Thanks for the writing up! This is Bo from Uber. I am in Felix's team in
> Seattle, and working on disaggregated shuffle (we called it remote shuffle
> service, RSS, internally). We have put RSS into production for a while, and
> learned a lot during the work (tried quite a few techniques to improve the
> remote shuffle performance). We could share our learning with the
> community, and also would like to hear feedback/suggestions on how to
> further improve remote shuffle performance. We could chat more details if
> you or other people are interested.
>
> Best,
> Bo
>
> On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <si...@google.com.invalid>
> wrote:
>
>> I would like to start a conversation about extending the Spark shuffle
>> manager surface to support fully disaggregated shuffle implementations.
>> This is closely related to the work in SPARK-25299
>> <https://issues.apache.org/jira/browse/SPARK-25299>, which is focused on
>> refactoring the shuffle manager API (and in particular, SortShuffleManager)
>> to use a pluggable storage backend. The motivation for that SPIP is further
>> enabling Spark on Kubernetes.
>>
>>
>> The motivation for this proposal is enabling full externalized
>> (disaggregated) shuffle service implementations. (Facebook’s Cosco
>> shuffle
>> <https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service>
>> is one example of such a disaggregated shuffle service.) These changes
>> allow the bulk of the shuffle to run in a remote service such that minimal
>> state resides in executors and local disk spill is minimized. The net
>> effect is increased job stability and performance improvements in certain
>> scenarios. These changes should work well with or are complementary to
>> SPARK-25299. Some or all points may be merged into that issue as
>> appropriate.
>>
>>
>> Below is a description of each component of this proposal. These changes
>> can ideally be introduced incrementally. I would like to gather feedback
>> and gauge interest from others in the community to collaborate on this.
>> There are likely more points that would  be useful to disaggregated shuffle
>> services. We can outline a more concrete plan after gathering enough input.
>> A working session could help us kick off this joint effort; maybe something
>> in the mid-January to mid-February timeframe (depending on interest and
>> availability. I’m happy to host at our Sunnyvale, CA offices.
>>
>>
>> ProposalScheduling and re-executing tasks
>>
>> Allow coordination between the service and the Spark DAG scheduler as to
>> whether a given block/partition needs to be recomputed when a task fails or
>> when shuffle block data cannot be read. Having such coordination is
>> important, e.g., for suppressing recomputation after aborted executors or
>> for forcing late recomputation if the service internally acts as a cache.
>> One catchall solution is to have the shuffle manager provide an indication
>> of whether shuffle data is external to executors (or nodes). Another
>> option: allow the shuffle manager (likely on the driver) to be queried for
>> the existence of shuffle data for a given executor ID (or perhaps map task,
>> reduce task, etc). Note that this is at the level of data the scheduler is
>> aware of (i.e., map/reduce partitions) rather than block IDs, which are
>> internal details for some shuffle managers.
>> ShuffleManager API
>>
>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the
>> service knows that data is still active. This is one way to enable
>> time-/job-scoped data because a disaggregated shuffle service cannot rely
>> on robust communication with Spark and in general has a distinct lifecycle
>> from the Spark deployment(s) it talks to. This would likely take the form
>> of a callback on ShuffleManager itself, but there are other approaches.
>>
>>
>> Add lifecycle hooks to shuffle readers and writers (e.g., to
>> close/recycle connections/streams/file handles as well as provide commit
>> semantics). SPARK-25299 adds commit semantics to the internal data storage
>> layer, but this is applicable to all shuffle managers at a higher level and
>> should apply equally to the ShuffleWriter.
>>
>>
>> Do not require ShuffleManagers to expose ShuffleBlockResolvers where they
>> are not needed. Ideally, this would be an implementation detail of the
>> shuffle manager itself. If there is substantial overlap between the
>> SortShuffleManager and other implementations, then the storage details can
>> be abstracted at the appropriate level. (SPARK-25299 does not currently
>> change this.)
>>
>>
>> Do not require MapStatus to include blockmanager IDs where they are not
>> relevant. This is captured by ShuffleBlockInfo
>> <https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit#heading=h.imi27prnziyj>
>> including an optional BlockManagerId in SPARK-25299. However, this
>> change should be lifted to the MapStatus level so that it applies to all
>> ShuffleManagers. Alternatively, use a more general data-location
>> abstraction than BlockManagerId. This gives the shuffle manager more
>> flexibility and the scheduler more information with respect to data
>> residence.
>> Serialization
>>
>> Allow serializers to be used more flexibly and efficiently. For example,
>> have serializers support writing an arbitrary number of objects into an
>> existing OutputStream or ByteBuffer. This enables objects to be serialized
>> to direct buffers where doing so makes sense. More importantly, it allows
>> arbitrary metadata/framing data to be wrapped around individual objects
>> cheaply. Right now, that’s only possible at the stream level. (There are
>> hacks around this, but this would enable more idiomatic use in efficient
>> shuffle implementations.)
>>
>>
>> Have serializers indicate whether they are deterministic. This provides
>> much of the value of a shuffle service because it means that reducers do
>> not need to spill to disk when reading/merging/combining inputs--the data
>> can be grouped by the service, even without the service understanding data
>> types or byte representations. Alternative (less preferable since it would
>> break Java serialization, for example): require all serializers to be
>> deterministic.
>>
>>
>>
>> --
>>
>> - Ben
>>
>

-- 
Ryan Blue
Software Engineer
Netflix

Re: Enabling fully disaggregated shuffle on Spark

Posted by bo yang <bo...@gmail.com>.
Hi Ben,

Thanks for the writing up! This is Bo from Uber. I am in Felix's team in
Seattle, and working on disaggregated shuffle (we called it remote shuffle
service, RSS, internally). We have put RSS into production for a while, and
learned a lot during the work (tried quite a few techniques to improve the
remote shuffle performance). We could share our learning with the
community, and also would like to hear feedback/suggestions on how to
further improve remote shuffle performance. We could chat more details if
you or other people are interested.

Best,
Bo

On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <si...@google.com.invalid>
wrote:

> I would like to start a conversation about extending the Spark shuffle
> manager surface to support fully disaggregated shuffle implementations.
> This is closely related to the work in SPARK-25299
> <https://issues.apache.org/jira/browse/SPARK-25299>, which is focused on
> refactoring the shuffle manager API (and in particular, SortShuffleManager)
> to use a pluggable storage backend. The motivation for that SPIP is further
> enabling Spark on Kubernetes.
>
>
> The motivation for this proposal is enabling full externalized
> (disaggregated) shuffle service implementations. (Facebook’s Cosco shuffle
> <https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service>
> is one example of such a disaggregated shuffle service.) These changes
> allow the bulk of the shuffle to run in a remote service such that minimal
> state resides in executors and local disk spill is minimized. The net
> effect is increased job stability and performance improvements in certain
> scenarios. These changes should work well with or are complementary to
> SPARK-25299. Some or all points may be merged into that issue as
> appropriate.
>
>
> Below is a description of each component of this proposal. These changes
> can ideally be introduced incrementally. I would like to gather feedback
> and gauge interest from others in the community to collaborate on this.
> There are likely more points that would  be useful to disaggregated shuffle
> services. We can outline a more concrete plan after gathering enough input.
> A working session could help us kick off this joint effort; maybe something
> in the mid-January to mid-February timeframe (depending on interest and
> availability. I’m happy to host at our Sunnyvale, CA offices.
>
>
> ProposalScheduling and re-executing tasks
>
> Allow coordination between the service and the Spark DAG scheduler as to
> whether a given block/partition needs to be recomputed when a task fails or
> when shuffle block data cannot be read. Having such coordination is
> important, e.g., for suppressing recomputation after aborted executors or
> for forcing late recomputation if the service internally acts as a cache.
> One catchall solution is to have the shuffle manager provide an indication
> of whether shuffle data is external to executors (or nodes). Another
> option: allow the shuffle manager (likely on the driver) to be queried for
> the existence of shuffle data for a given executor ID (or perhaps map task,
> reduce task, etc). Note that this is at the level of data the scheduler is
> aware of (i.e., map/reduce partitions) rather than block IDs, which are
> internal details for some shuffle managers.
> ShuffleManager API
>
> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the
> service knows that data is still active. This is one way to enable
> time-/job-scoped data because a disaggregated shuffle service cannot rely
> on robust communication with Spark and in general has a distinct lifecycle
> from the Spark deployment(s) it talks to. This would likely take the form
> of a callback on ShuffleManager itself, but there are other approaches.
>
>
> Add lifecycle hooks to shuffle readers and writers (e.g., to close/recycle
> connections/streams/file handles as well as provide commit semantics).
> SPARK-25299 adds commit semantics to the internal data storage layer, but
> this is applicable to all shuffle managers at a higher level and should
> apply equally to the ShuffleWriter.
>
>
> Do not require ShuffleManagers to expose ShuffleBlockResolvers where they
> are not needed. Ideally, this would be an implementation detail of the
> shuffle manager itself. If there is substantial overlap between the
> SortShuffleManager and other implementations, then the storage details can
> be abstracted at the appropriate level. (SPARK-25299 does not currently
> change this.)
>
>
> Do not require MapStatus to include blockmanager IDs where they are not
> relevant. This is captured by ShuffleBlockInfo
> <https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit#heading=h.imi27prnziyj>
> including an optional BlockManagerId in SPARK-25299. However, this change
> should be lifted to the MapStatus level so that it applies to all
> ShuffleManagers. Alternatively, use a more general data-location
> abstraction than BlockManagerId. This gives the shuffle manager more
> flexibility and the scheduler more information with respect to data
> residence.
> Serialization
>
> Allow serializers to be used more flexibly and efficiently. For example,
> have serializers support writing an arbitrary number of objects into an
> existing OutputStream or ByteBuffer. This enables objects to be serialized
> to direct buffers where doing so makes sense. More importantly, it allows
> arbitrary metadata/framing data to be wrapped around individual objects
> cheaply. Right now, that’s only possible at the stream level. (There are
> hacks around this, but this would enable more idiomatic use in efficient
> shuffle implementations.)
>
>
> Have serializers indicate whether they are deterministic. This provides
> much of the value of a shuffle service because it means that reducers do
> not need to spill to disk when reading/merging/combining inputs--the data
> can be grouped by the service, even without the service understanding data
> types or byte representations. Alternative (less preferable since it would
> break Java serialization, for example): require all serializers to be
> deterministic.
>
>
>
> --
>
> - Ben
>

Re: Enabling fully disaggregated shuffle on Spark

Posted by Ben Sidhom <si...@google.com.INVALID>.
Recapping today's sync on the wider dev list for visibility:

The original proposals here can be refactored into 3 distinct changes which
could be integrated iteratively. In order of decreasing priority:

   1. Allow MapStatus to take an arbitrary/opaque payload and rip out hard
   references to executor ids, etc. This lets shuffle implementations
   customize, e.g., block location specs and decouples shuffle results from
   executors/specific machines.
   2. Allow MapStatus to be dynamically updated by inserting RPC hooks in
   strategic places. Shuffle managers can then hook into these and, for
   example, invalidate shuffle data on external failure or notify the
   MapStatus tracker that asynchronous backups are ready. This replaces the
   scheduler changes proposed above.
   3. Deterministic/sort-consistent serializer APIs that allow key-wise
   aggregation/sorting server-side.

Point 1 is really a prerequisite for 2 since dynamic updates are only
useful to shuffle managers if they have the necessary data available. Point
3 is independent but also lower priority because it can be considered a
performance optimization but may require invasive changes to Spark (and
user code) to actually work.

The tentative plan is to separate these efforts into 3 separate proposal
docs (possibly with discussion doc(s) while the details gel).

On Fri, Dec 6, 2019 at 7:53 AM Li Hao <li...@gmail.com> wrote:

> Agree with Bo's  idea that the MapStatus could be a more generalized
> concept, not necessary to be bound with BlockManager/Executor.
>
> As I understand it, the MapStatus are used to track/record the output data
> location of a map task ,  created by shuffle writer, used by shuffle reader
> for  finding and reading their shuffle data. So, if we want to keep using
> MapStatus to provide same functionality in various different
> shuffle implementations,  then it should  be a more generalized so that
> different shuffle writer should be able to encapsulate their own specific
> data location info into a MapStatus object, and similarly, different
> shuffle reader should be able to retrieve their info from MapStatus object.
>
> There are two ways to make MapStatus more generalized in my observation:
> 1. make MapStatus extendable(as Bo mentioned above, making MapStatus a
> public non-sealed trait), so that different shuffle way could has their
> own MapStatus implementation.
> 2. make the location in MapStatus a more general data-location identifier
> (as mentioned in  Ben's Proposal), maybe something like URL, for example
> executor://host:port:mapid, dfs://path/to/data(which is the case in Baidu's
> disaggregated shuffle implementation), s3://path/to/data,
> xxshuffleserver://host:port:dataid, so that different shuffle writer
> could encode its output data location into this url and the reader
> will understand the what this URL means,  finally find and read the shuffle
> data.
>
> These two ways are not in conflict, actually, we could use the second way
> to make MapStatus a more generalized concept considering various
> data-location representations in  different shuffle implementations, and
> also use the first way to provide extendability so that various shuffle
> writer could encapsulate more their own info about  output into MapStatus,
> not just data location, reduce size and mapId in current MapStatus trait,
> but also some other necessary info that needed by the reduce/shuffle reader
> side.
>
> Best regards,
> Li Hao
>
> On Thu, 5 Dec 2019 at 12:15, bo yang <bo...@gmail.com> wrote:
>
>> Thanks guys for the discussion in the email and also this afternoon!
>>
>> From our experience, we do not need to change Spark DAG scheduler to
>> implement a remote shuffle service. Current Spark shuffle manager
>> interfaces are pretty good and easy to implement. But we do feel the need
>> to modify MapStatus to make it more generic.
>>
>> The current limit with MapStatus is that it assumes* a map output only
>> exists on a single executor* (see following). One easy update could be
>> making MapStatus supports the scenario where *a map output could be on
>> multiple remote servers*.
>>
>> private[spark] sealed trait MapStatus {
>> def location: BlockManagerId
>> }
>>
>> class BlockManagerId private {
>> private var executorId_ : String,
>> private var host_ : String,
>> private var port_ : Int,
>> }
>>
>> Also, MapStatus is a sealed trait, thus our ShuffleManager plugin could
>> not extend it with our own implementation. How about *making MapStatus a
>> public non-sealed trait*? So different Shuffle Manager plugin could
>> implement their own MapStatus classes.
>>
>> Best,
>> Bo
>>
>> On Wed, Dec 4, 2019 at 3:27 PM Ben Sidhom <si...@google.com.invalid>
>> wrote:
>>
>>> Hey Imran (and everybody who made it to the sync today):
>>>
>>> Thanks for the comments. Responses below:
>>>
>>> Scheduling and re-executing tasks
>>>>> Allow coordination between the service and the Spark DAG scheduler as
>>>>> to whether a given block/partition needs to be recomputed when a task fails
>>>>> or when shuffle block data cannot be read. Having such coordination is
>>>>> important, e.g., for suppressing recomputation after aborted executors or
>>>>> for forcing late recomputation if the service internally acts as a cache.
>>>>> One catchall solution is to have the shuffle manager provide an indication
>>>>> of whether shuffle data is external to executors (or nodes). Another
>>>>> option: allow the shuffle manager (likely on the driver) to be queried for
>>>>> the existence of shuffle data for a given executor ID (or perhaps map task,
>>>>> reduce task, etc). Note that this is at the level of data the scheduler is
>>>>> aware of (i.e., map/reduce partitions) rather than block IDs, which are
>>>>> internal details for some shuffle managers.
>>>>
>>>>
>>>> sounds reasonable, and I think @Matt Cheah  mentioned something like
>>>> this has come up with their work on SPARK-25299 and was going to be added
>>>> even for that work.  (of course, need to look at the actual proposal
>>>> closely and how it impacts the scheduler.)
>>>
>>>
>>> While this is something that was discussed before, it is not something
>>> that is *currently* in the scope of SPARK-25299. Given the number of
>>> parties who are doing async data pushes (either as a backup, as in the case
>>> of the proposal in SPARK-25299, or as the sole mechanism of data
>>> distribution), I expect this to be an issue at the forefront for many
>>> people. I have not yet written a specific proposal for how this should be
>>> done. Rather, I wanted to gauge how many others see this as an important
>>> issue and figure out the most reasonable solutions for the community as a
>>> whole. It sounds like people have been getting by this using hacks so far.
>>> I would be curious to hear what does and does not work well and which
>>> solutions we would be OK with in Spark upstream.
>>>
>>>
>>> ShuffleManager API
>>>>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that
>>>>> the service knows that data is still active. This is one way to enable
>>>>> time-/job-scoped data because a disaggregated shuffle service cannot rely
>>>>> on robust communication with Spark and in general has a distinct lifecycle
>>>>> from the Spark deployment(s) it talks to. This would likely take the form
>>>>> of a callback on ShuffleManager itself, but there are other approaches.
>>>>
>>>>
>>>
>>> I believe this can already be done, but maybe its much uglier than it
>>>> needs to be (though I don't recall the details off the top of my head).
>>>
>>>
>>> As far as I'm aware, this would need to be added out-of-band, e.g., by
>>> the ShuffleManager itself firing off its own heartbeat thread(s) (on the
>>> driver, executors, or both). While obviously this is possible, it's also
>>> prone to leaks and puts more burden on shuffle implementations. In fact, I
>>> don't have a robust understanding of the lifecycle of the ShuffleManager
>>> object itself. IIRC (from some ad-hoc tests I did a while back), a new one
>>> is spawned on each executor itself (as opposed to being instantiated once
>>> on the driver and deserialized onto executors). If executor
>>> (ShuffleManager) instances do not receive shutdown hooks, shuffle
>>> implementations may be prone to resource leaks. Worse, if the behavior of
>>> ShuffleManager instantiation is not stable between Spark releases, there
>>> may be correctness issues due to intializers/constructors running in
>>> unexpected ways. Then you have the ShuffleManager instance used for
>>> registration. As far as I can tell, this runs on the driver, but might this
>>> be migrated between machines (either now or in future Spark releases),
>>> e.g., in cluster mode?
>>>
>>> If this were taken care of by the Spark scheduler rather than the
>>> shuffle manager itself, we could avoid an entire class of subtle issues. My
>>> off-the-cuff suggestion above was to expose a callback on the
>>> ShuffleManager that allows implementations to define their own heartbeat
>>> logic. That could then be invoked by the scheduler when and where
>>> appropriate (along with any other lifecycle callbacks we might add).
>>>
>>> Add lifecycle hooks to shuffle readers and writers (e.g., to
>>>>> close/recycle connections/streams/file handles as well as provide commit
>>>>> semantics). SPARK-25299 adds commit semantics to the internal data storage
>>>>> layer, but this is applicable to all shuffle managers at a higher level and
>>>>> should apply equally to the ShuffleWriter.
>>>>
>>>>
>>>> ShuffleWriter has a
>>>>
>>>>> def stop(success: Boolean): Option[MapStatus]
>>>>
>>>>  I would need more info about why that isn't enough.  (But if there is
>>>> a need for it, yes this makes sense.)
>>>
>>>
>>> That's probably fine for most purposes. However, that stop hook only
>>> exists on shuffle writers. What about on readers? In any case, each
>>> instance reader/writer instance appears to only be invoked once for reading
>>> or writing. If ShuffleManagers can assume that behavior is stable, this
>>> point is less important. In any case, if we do intend to enable "external"
>>> shuffle implementations, we should make the APIs as explicit as possible
>>> and ensure we're enabling cleanup (and commits) wherever possible.
>>>
>>> Serialization
>>>>> Allow serializers to be used more flexibly and efficiently. For
>>>>> example, have serializers support writing an arbitrary number of objects
>>>>> into an existing OutputStream or ByteBuffer. This enables objects to be
>>>>> serialized to direct buffers where doing so makes sense. More importantly,
>>>>> it allows arbitrary metadata/framing data to be wrapped around individual
>>>>> objects cheaply. Right now, that’s only possible at the stream level.
>>>>> (There are hacks around this, but this would enable more idiomatic use in
>>>>> efficient shuffle implementations.)
>>>>
>>>>
>>>
>>> I don't really understand how this is different from the existing
>>>> SerializationStream -- probably a small example would clarify.
>>>
>>>
>>> I illustrated the use case poorly above. It *can* be worked around as
>>> of now, but not cleanly-and-efficiently (you *can* get one at a time).
>>> Consider shuffle implementations that do not dump raw stream data to some
>>> storage service but need to frame serialized objects in some way. They are
>>> stuck jumping through hoops with the current SerializationStream structure
>>> (e.g., instantiating a fake/wrapper OutputStream and serializer instance
>>> for each frame or doing even worse trickery to avoid that allocation
>>> penalty). If serializers could write to an *existing* byte array
>>> or---better yet---a ByteBuffer, then this song and dance could be avoided.
>>>
>>> I would advocate for ByteBuffers as a first-class data sink as a
>>> performance optimization. This confers 2 benefits:
>>>
>>>    - Users of asynchronous byte channels don't have to copy data
>>>    between arrays and buffers or give up asynchronicity.
>>>    - Direct buffers avoid excess data copies and kernel boundary jumps
>>>    when writing to certain sink
>>>
>>> Now that I think about it, this *could *equally benefit the SPARK-25299
>>> use case where channels are used.
>>>
>>> Have serializers indicate whether they are deterministic. This provides
>>>>> much of the value of a shuffle service because it means that reducers do
>>>>> not need to spill to disk when reading/merging/combining inputs--the data
>>>>> can be grouped by the service, even without the service understanding data
>>>>> types or byte representations. Alternative (less preferable since it would
>>>>> break Java serialization, for example): require all serializers to be
>>>>> deterministic.
>>>>
>>>>
>>>
>>> I really don't understand this one, sorry, can you elaborate more?  I'm
>>>> not sure what determinism has to do with spilling to disk.  There is
>>>> already supportsRelocationOfSerializedObjects , though that is private,
>>>> which seems related but I think you're talking about something else?
>>>
>>>
>>> First off, by deterministic serialization I mean literally that: one
>>> object (or two objects that are considered equal) will serialize to the
>>> same byte representation no matter when/how it is serialized. This point is
>>> about allowing external shuffle/merging services to operate on the
>>> key/value level without having to actually understand the byte
>>> representation of objects. Instead of merging *partitions*, shuffle
>>> managers can merge *data elements*. All of this can be done without
>>> shipping JVM Comparator functions (i.e., arbitrary code) to shuffle
>>> services.
>>>
>>> There are some dirty hacks/workarounds that can approximate this
>>> behavior even without strictly deterministic serialization, but we can only
>>> *guarantee* that shuffle readers (or writers for that matter) do not
>>> require local disk spill (no more local ExternalSorters) when we're working
>>> with deterministic serializers and a shuffle service that understands so.
>>>
>>> As far as I'm aware, supportsRelocationOfSerializedObjects only means
>>> that a given object can be moved around within a segment of serialized
>>> data. (For example, certain object graphs with cycles or other unusual data
>>> structures can be encoded but impose requirements on data stream ordering.)
>>> Note that serialized object relocation is a necessary but not sufficient
>>> condition for deterministic serialization (and spill-free shuffles).
>>>
>>>
>>>
>>> Anyway, there were a *lot* of people on the call today and we didn't
>>> get a chance to dig into the nitty-gritty details of these points. I would
>>> like to know what others think of these (not-fleshed-out) proposals, how
>>> they do (or do not) work with disaggregated shuffle implementations in the
>>> wild, and alternative workarounds that people have used so far. I'm
>>> particularly interested in learning how others have dealt with async writes
>>> and data reconciliation. Once I have that feedback, I'm happy to put out a
>>> more focused design doc that we can collect further comments on and iterate.
>>>
>>> On Wed, Dec 4, 2019 at 10:58 AM Imran Rashid
>>> <ir...@cloudera.com.invalid> wrote:
>>>
>>>> Hi Ben,
>>>>
>>>> in general everything you're proposing sounds reasonable.  For me, at
>>>> least, I'd need more details on most of the points before I fully
>>>> understand them, but I'm definitely in favor of the general goal for making
>>>> spark support fully disaggregated shuffle.  Of course, I also want to make
>>>> sure it can be done in a way that involves the least risky changes to spark
>>>> itself and we can continue to support.
>>>>
>>>> One very-high level point which I think is worth keeping in mind for
>>>> the wider community following this -- the key difference between what you
>>>> are proposing and SPARK-25299, is that SPARK-25299 still uses spark's
>>>> existing shuffle implementation, which leverages local disk.  Your goal is
>>>> to better support shuffling all data via some external service, which
>>>> avoids shuffle data hitting executors local disks entirely.  This was
>>>> already possible, to some extent, even before SPARK-25299 with the
>>>> ShuffleManager api; but as you note, there are shortcomings which need to
>>>> be addressed.  (Historical note: that api wasn't designed with totally
>>>> distributed shuffle services in mind, it was to support hash- vs.
>>>> sort-based shuffle, all still on spark's executors.)
>>>>
>>>> One thing that I thought you would have needed, but you didn't mention
>>>> here, is changes to the scheduler to add an extra step between the
>>>> shuffle-write & shuffle-read stages, if it needs to do any work to
>>>> reorganize data, I think I have heard this come up in prior discussions.
>>>>
>>>> A couple of inline comments below:
>>>>
>>>> On Fri, Nov 15, 2019 at 6:10 PM Ben Sidhom <si...@google.com.invalid>
>>>> wrote:
>>>>
>>>>> Proposal
>>>>> Scheduling and re-executing tasks
>>>>>
>>>>> Allow coordination between the service and the Spark DAG scheduler as
>>>>> to whether a given block/partition needs to be recomputed when a task fails
>>>>> or when shuffle block data cannot be read. Having such coordination is
>>>>> important, e.g., for suppressing recomputation after aborted executors or
>>>>> for forcing late recomputation if the service internally acts as a cache.
>>>>> One catchall solution is to have the shuffle manager provide an indication
>>>>> of whether shuffle data is external to executors (or nodes). Another
>>>>> option: allow the shuffle manager (likely on the driver) to be queried for
>>>>> the existence of shuffle data for a given executor ID (or perhaps map task,
>>>>> reduce task, etc). Note that this is at the level of data the scheduler is
>>>>> aware of (i.e., map/reduce partitions) rather than block IDs, which are
>>>>> internal details for some shuffle managers.
>>>>>
>>>>
>>>> sounds reasonable, and I think @Matt Cheah <mc...@palantir.com>
>>>> mentioned something like this has come up with their work on SPARK-25299
>>>> and was going to be added even for that work.  (of course, need to look at
>>>> the actual proposal closely and how it impacts the scheduler.)
>>>>
>>>>> ShuffleManager API
>>>>>
>>>>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that
>>>>> the service knows that data is still active. This is one way to enable
>>>>> time-/job-scoped data because a disaggregated shuffle service cannot rely
>>>>> on robust communication with Spark and in general has a distinct lifecycle
>>>>> from the Spark deployment(s) it talks to. This would likely take the form
>>>>> of a callback on ShuffleManager itself, but there are other approaches.
>>>>>
>>>>
>>>> I believe this can already be done, but maybe its much uglier than it
>>>> needs to be (though I don't recall the details off the top of my head).
>>>>
>>>>
>>>>> Add lifecycle hooks to shuffle readers and writers (e.g., to
>>>>> close/recycle connections/streams/file handles as well as provide commit
>>>>> semantics). SPARK-25299 adds commit semantics to the internal data storage
>>>>> layer, but this is applicable to all shuffle managers at a higher level and
>>>>> should apply equally to the ShuffleWriter.
>>>>>
>>>>
>>>> ShuffleWriter has a
>>>>
>>>> def stop(success: Boolean): Option[MapStatus]
>>>>
>>>>  I would need more info about why that isn't enough.  (But if there is
>>>> a need for it, yes this makes sense.)
>>>>
>>>>> Serialization
>>>>>
>>>>> Allow serializers to be used more flexibly and efficiently. For
>>>>> example, have serializers support writing an arbitrary number of objects
>>>>> into an existing OutputStream or ByteBuffer. This enables objects to be
>>>>> serialized to direct buffers where doing so makes sense. More importantly,
>>>>> it allows arbitrary metadata/framing data to be wrapped around individual
>>>>> objects cheaply. Right now, that’s only possible at the stream level.
>>>>> (There are hacks around this, but this would enable more idiomatic use in
>>>>> efficient shuffle implementations.)
>>>>>
>>>>
>>>> I don't really understand how this is different from the existing
>>>> SerializationStream -- probably a small example would clarify.
>>>>
>>>>
>>>>> Have serializers indicate whether they are deterministic. This
>>>>> provides much of the value of a shuffle service because it means that
>>>>> reducers do not need to spill to disk when reading/merging/combining
>>>>> inputs--the data can be grouped by the service, even without the service
>>>>> understanding data types or byte representations. Alternative (less
>>>>> preferable since it would break Java serialization, for example): require
>>>>> all serializers to be deterministic.
>>>>>
>>>>
>>>> I really don't understand this one, sorry, can you elaborate more?  I'm
>>>> not sure what determinism has to do with spilling to disk.  There is
>>>> already supportsRelocationOfSerializedObjects , though that is private,
>>>> which seems related but I think you're talking about something else?
>>>>
>>>> thanks,
>>>> Imran
>>>>
>>>>>
>>>
>>> --
>>> -Ben
>>>
>>

-- 
-Ben

Re: Enabling fully disaggregated shuffle on Spark

Posted by Li Hao <li...@gmail.com>.
Agree with Bo's  idea that the MapStatus could be a more generalized
concept, not necessary to be bound with BlockManager/Executor.

As I understand it, the MapStatus are used to track/record the output data
location of a map task ,  created by shuffle writer, used by shuffle reader
for  finding and reading their shuffle data. So, if we want to keep using
MapStatus to provide same functionality in various different
shuffle implementations,  then it should  be a more generalized so that
different shuffle writer should be able to encapsulate their own specific
data location info into a MapStatus object, and similarly, different
shuffle reader should be able to retrieve their info from MapStatus object.

There are two ways to make MapStatus more generalized in my observation:
1. make MapStatus extendable(as Bo mentioned above, making MapStatus a
public non-sealed trait), so that different shuffle way could has their
own MapStatus implementation.
2. make the location in MapStatus a more general data-location identifier
(as mentioned in  Ben's Proposal), maybe something like URL, for example
executor://host:port:mapid, dfs://path/to/data(which is the case in Baidu's
disaggregated shuffle implementation), s3://path/to/data,
xxshuffleserver://host:port:dataid, so that different shuffle writer
could encode its output data location into this url and the reader
will understand the what this URL means,  finally find and read the shuffle
data.

These two ways are not in conflict, actually, we could use the second way
to make MapStatus a more generalized concept considering various
data-location representations in  different shuffle implementations, and
also use the first way to provide extendability so that various shuffle
writer could encapsulate more their own info about  output into MapStatus,
not just data location, reduce size and mapId in current MapStatus trait,
but also some other necessary info that needed by the reduce/shuffle reader
side.

Best regards,
Li Hao

On Thu, 5 Dec 2019 at 12:15, bo yang <bo...@gmail.com> wrote:

> Thanks guys for the discussion in the email and also this afternoon!
>
> From our experience, we do not need to change Spark DAG scheduler to
> implement a remote shuffle service. Current Spark shuffle manager
> interfaces are pretty good and easy to implement. But we do feel the need
> to modify MapStatus to make it more generic.
>
> The current limit with MapStatus is that it assumes* a map output only
> exists on a single executor* (see following). One easy update could be
> making MapStatus supports the scenario where *a map output could be on
> multiple remote servers*.
>
> private[spark] sealed trait MapStatus {
> def location: BlockManagerId
> }
>
> class BlockManagerId private {
> private var executorId_ : String,
> private var host_ : String,
> private var port_ : Int,
> }
>
> Also, MapStatus is a sealed trait, thus our ShuffleManager plugin could
> not extend it with our own implementation. How about *making MapStatus a
> public non-sealed trait*? So different Shuffle Manager plugin could
> implement their own MapStatus classes.
>
> Best,
> Bo
>
> On Wed, Dec 4, 2019 at 3:27 PM Ben Sidhom <si...@google.com.invalid>
> wrote:
>
>> Hey Imran (and everybody who made it to the sync today):
>>
>> Thanks for the comments. Responses below:
>>
>> Scheduling and re-executing tasks
>>>> Allow coordination between the service and the Spark DAG scheduler as
>>>> to whether a given block/partition needs to be recomputed when a task fails
>>>> or when shuffle block data cannot be read. Having such coordination is
>>>> important, e.g., for suppressing recomputation after aborted executors or
>>>> for forcing late recomputation if the service internally acts as a cache.
>>>> One catchall solution is to have the shuffle manager provide an indication
>>>> of whether shuffle data is external to executors (or nodes). Another
>>>> option: allow the shuffle manager (likely on the driver) to be queried for
>>>> the existence of shuffle data for a given executor ID (or perhaps map task,
>>>> reduce task, etc). Note that this is at the level of data the scheduler is
>>>> aware of (i.e., map/reduce partitions) rather than block IDs, which are
>>>> internal details for some shuffle managers.
>>>
>>>
>>> sounds reasonable, and I think @Matt Cheah  mentioned something like
>>> this has come up with their work on SPARK-25299 and was going to be added
>>> even for that work.  (of course, need to look at the actual proposal
>>> closely and how it impacts the scheduler.)
>>
>>
>> While this is something that was discussed before, it is not something
>> that is *currently* in the scope of SPARK-25299. Given the number of
>> parties who are doing async data pushes (either as a backup, as in the case
>> of the proposal in SPARK-25299, or as the sole mechanism of data
>> distribution), I expect this to be an issue at the forefront for many
>> people. I have not yet written a specific proposal for how this should be
>> done. Rather, I wanted to gauge how many others see this as an important
>> issue and figure out the most reasonable solutions for the community as a
>> whole. It sounds like people have been getting by this using hacks so far.
>> I would be curious to hear what does and does not work well and which
>> solutions we would be OK with in Spark upstream.
>>
>>
>> ShuffleManager API
>>>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that
>>>> the service knows that data is still active. This is one way to enable
>>>> time-/job-scoped data because a disaggregated shuffle service cannot rely
>>>> on robust communication with Spark and in general has a distinct lifecycle
>>>> from the Spark deployment(s) it talks to. This would likely take the form
>>>> of a callback on ShuffleManager itself, but there are other approaches.
>>>
>>>
>>
>> I believe this can already be done, but maybe its much uglier than it
>>> needs to be (though I don't recall the details off the top of my head).
>>
>>
>> As far as I'm aware, this would need to be added out-of-band, e.g., by
>> the ShuffleManager itself firing off its own heartbeat thread(s) (on the
>> driver, executors, or both). While obviously this is possible, it's also
>> prone to leaks and puts more burden on shuffle implementations. In fact, I
>> don't have a robust understanding of the lifecycle of the ShuffleManager
>> object itself. IIRC (from some ad-hoc tests I did a while back), a new one
>> is spawned on each executor itself (as opposed to being instantiated once
>> on the driver and deserialized onto executors). If executor
>> (ShuffleManager) instances do not receive shutdown hooks, shuffle
>> implementations may be prone to resource leaks. Worse, if the behavior of
>> ShuffleManager instantiation is not stable between Spark releases, there
>> may be correctness issues due to intializers/constructors running in
>> unexpected ways. Then you have the ShuffleManager instance used for
>> registration. As far as I can tell, this runs on the driver, but might this
>> be migrated between machines (either now or in future Spark releases),
>> e.g., in cluster mode?
>>
>> If this were taken care of by the Spark scheduler rather than the shuffle
>> manager itself, we could avoid an entire class of subtle issues. My
>> off-the-cuff suggestion above was to expose a callback on the
>> ShuffleManager that allows implementations to define their own heartbeat
>> logic. That could then be invoked by the scheduler when and where
>> appropriate (along with any other lifecycle callbacks we might add).
>>
>> Add lifecycle hooks to shuffle readers and writers (e.g., to
>>>> close/recycle connections/streams/file handles as well as provide commit
>>>> semantics). SPARK-25299 adds commit semantics to the internal data storage
>>>> layer, but this is applicable to all shuffle managers at a higher level and
>>>> should apply equally to the ShuffleWriter.
>>>
>>>
>>> ShuffleWriter has a
>>>
>>>> def stop(success: Boolean): Option[MapStatus]
>>>
>>>  I would need more info about why that isn't enough.  (But if there is a
>>> need for it, yes this makes sense.)
>>
>>
>> That's probably fine for most purposes. However, that stop hook only
>> exists on shuffle writers. What about on readers? In any case, each
>> instance reader/writer instance appears to only be invoked once for reading
>> or writing. If ShuffleManagers can assume that behavior is stable, this
>> point is less important. In any case, if we do intend to enable "external"
>> shuffle implementations, we should make the APIs as explicit as possible
>> and ensure we're enabling cleanup (and commits) wherever possible.
>>
>> Serialization
>>>> Allow serializers to be used more flexibly and efficiently. For
>>>> example, have serializers support writing an arbitrary number of objects
>>>> into an existing OutputStream or ByteBuffer. This enables objects to be
>>>> serialized to direct buffers where doing so makes sense. More importantly,
>>>> it allows arbitrary metadata/framing data to be wrapped around individual
>>>> objects cheaply. Right now, that’s only possible at the stream level.
>>>> (There are hacks around this, but this would enable more idiomatic use in
>>>> efficient shuffle implementations.)
>>>
>>>
>>
>> I don't really understand how this is different from the existing
>>> SerializationStream -- probably a small example would clarify.
>>
>>
>> I illustrated the use case poorly above. It *can* be worked around as of
>> now, but not cleanly-and-efficiently (you *can* get one at a time).
>> Consider shuffle implementations that do not dump raw stream data to some
>> storage service but need to frame serialized objects in some way. They are
>> stuck jumping through hoops with the current SerializationStream structure
>> (e.g., instantiating a fake/wrapper OutputStream and serializer instance
>> for each frame or doing even worse trickery to avoid that allocation
>> penalty). If serializers could write to an *existing* byte array
>> or---better yet---a ByteBuffer, then this song and dance could be avoided.
>>
>> I would advocate for ByteBuffers as a first-class data sink as a
>> performance optimization. This confers 2 benefits:
>>
>>    - Users of asynchronous byte channels don't have to copy data between
>>    arrays and buffers or give up asynchronicity.
>>    - Direct buffers avoid excess data copies and kernel boundary jumps
>>    when writing to certain sink
>>
>> Now that I think about it, this *could *equally benefit the SPARK-25299
>> use case where channels are used.
>>
>> Have serializers indicate whether they are deterministic. This provides
>>>> much of the value of a shuffle service because it means that reducers do
>>>> not need to spill to disk when reading/merging/combining inputs--the data
>>>> can be grouped by the service, even without the service understanding data
>>>> types or byte representations. Alternative (less preferable since it would
>>>> break Java serialization, for example): require all serializers to be
>>>> deterministic.
>>>
>>>
>>
>> I really don't understand this one, sorry, can you elaborate more?  I'm
>>> not sure what determinism has to do with spilling to disk.  There is
>>> already supportsRelocationOfSerializedObjects , though that is private,
>>> which seems related but I think you're talking about something else?
>>
>>
>> First off, by deterministic serialization I mean literally that: one
>> object (or two objects that are considered equal) will serialize to the
>> same byte representation no matter when/how it is serialized. This point is
>> about allowing external shuffle/merging services to operate on the
>> key/value level without having to actually understand the byte
>> representation of objects. Instead of merging *partitions*, shuffle
>> managers can merge *data elements*. All of this can be done without
>> shipping JVM Comparator functions (i.e., arbitrary code) to shuffle
>> services.
>>
>> There are some dirty hacks/workarounds that can approximate this behavior
>> even without strictly deterministic serialization, but we can only
>> *guarantee* that shuffle readers (or writers for that matter) do not
>> require local disk spill (no more local ExternalSorters) when we're working
>> with deterministic serializers and a shuffle service that understands so.
>>
>> As far as I'm aware, supportsRelocationOfSerializedObjects only means
>> that a given object can be moved around within a segment of serialized
>> data. (For example, certain object graphs with cycles or other unusual data
>> structures can be encoded but impose requirements on data stream ordering.)
>> Note that serialized object relocation is a necessary but not sufficient
>> condition for deterministic serialization (and spill-free shuffles).
>>
>>
>>
>> Anyway, there were a *lot* of people on the call today and we didn't get
>> a chance to dig into the nitty-gritty details of these points. I would like
>> to know what others think of these (not-fleshed-out) proposals, how they do
>> (or do not) work with disaggregated shuffle implementations in the wild,
>> and alternative workarounds that people have used so far. I'm particularly
>> interested in learning how others have dealt with async writes and data
>> reconciliation. Once I have that feedback, I'm happy to put out a more
>> focused design doc that we can collect further comments on and iterate.
>>
>> On Wed, Dec 4, 2019 at 10:58 AM Imran Rashid <ir...@cloudera.com.invalid>
>> wrote:
>>
>>> Hi Ben,
>>>
>>> in general everything you're proposing sounds reasonable.  For me, at
>>> least, I'd need more details on most of the points before I fully
>>> understand them, but I'm definitely in favor of the general goal for making
>>> spark support fully disaggregated shuffle.  Of course, I also want to make
>>> sure it can be done in a way that involves the least risky changes to spark
>>> itself and we can continue to support.
>>>
>>> One very-high level point which I think is worth keeping in mind for the
>>> wider community following this -- the key difference between what you are
>>> proposing and SPARK-25299, is that SPARK-25299 still uses spark's existing
>>> shuffle implementation, which leverages local disk.  Your goal is to better
>>> support shuffling all data via some external service, which avoids shuffle
>>> data hitting executors local disks entirely.  This was already possible, to
>>> some extent, even before SPARK-25299 with the ShuffleManager api; but as
>>> you note, there are shortcomings which need to be addressed.  (Historical
>>> note: that api wasn't designed with totally distributed shuffle services in
>>> mind, it was to support hash- vs. sort-based shuffle, all still on spark's
>>> executors.)
>>>
>>> One thing that I thought you would have needed, but you didn't mention
>>> here, is changes to the scheduler to add an extra step between the
>>> shuffle-write & shuffle-read stages, if it needs to do any work to
>>> reorganize data, I think I have heard this come up in prior discussions.
>>>
>>> A couple of inline comments below:
>>>
>>> On Fri, Nov 15, 2019 at 6:10 PM Ben Sidhom <si...@google.com.invalid>
>>> wrote:
>>>
>>>> Proposal
>>>> Scheduling and re-executing tasks
>>>>
>>>> Allow coordination between the service and the Spark DAG scheduler as
>>>> to whether a given block/partition needs to be recomputed when a task fails
>>>> or when shuffle block data cannot be read. Having such coordination is
>>>> important, e.g., for suppressing recomputation after aborted executors or
>>>> for forcing late recomputation if the service internally acts as a cache.
>>>> One catchall solution is to have the shuffle manager provide an indication
>>>> of whether shuffle data is external to executors (or nodes). Another
>>>> option: allow the shuffle manager (likely on the driver) to be queried for
>>>> the existence of shuffle data for a given executor ID (or perhaps map task,
>>>> reduce task, etc). Note that this is at the level of data the scheduler is
>>>> aware of (i.e., map/reduce partitions) rather than block IDs, which are
>>>> internal details for some shuffle managers.
>>>>
>>>
>>> sounds reasonable, and I think @Matt Cheah <mc...@palantir.com>
>>> mentioned something like this has come up with their work on SPARK-25299
>>> and was going to be added even for that work.  (of course, need to look at
>>> the actual proposal closely and how it impacts the scheduler.)
>>>
>>>> ShuffleManager API
>>>>
>>>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that
>>>> the service knows that data is still active. This is one way to enable
>>>> time-/job-scoped data because a disaggregated shuffle service cannot rely
>>>> on robust communication with Spark and in general has a distinct lifecycle
>>>> from the Spark deployment(s) it talks to. This would likely take the form
>>>> of a callback on ShuffleManager itself, but there are other approaches.
>>>>
>>>
>>> I believe this can already be done, but maybe its much uglier than it
>>> needs to be (though I don't recall the details off the top of my head).
>>>
>>>
>>>> Add lifecycle hooks to shuffle readers and writers (e.g., to
>>>> close/recycle connections/streams/file handles as well as provide commit
>>>> semantics). SPARK-25299 adds commit semantics to the internal data storage
>>>> layer, but this is applicable to all shuffle managers at a higher level and
>>>> should apply equally to the ShuffleWriter.
>>>>
>>>
>>> ShuffleWriter has a
>>>
>>> def stop(success: Boolean): Option[MapStatus]
>>>
>>>  I would need more info about why that isn't enough.  (But if there is a
>>> need for it, yes this makes sense.)
>>>
>>>> Serialization
>>>>
>>>> Allow serializers to be used more flexibly and efficiently. For
>>>> example, have serializers support writing an arbitrary number of objects
>>>> into an existing OutputStream or ByteBuffer. This enables objects to be
>>>> serialized to direct buffers where doing so makes sense. More importantly,
>>>> it allows arbitrary metadata/framing data to be wrapped around individual
>>>> objects cheaply. Right now, that’s only possible at the stream level.
>>>> (There are hacks around this, but this would enable more idiomatic use in
>>>> efficient shuffle implementations.)
>>>>
>>>
>>> I don't really understand how this is different from the existing
>>> SerializationStream -- probably a small example would clarify.
>>>
>>>
>>>> Have serializers indicate whether they are deterministic. This provides
>>>> much of the value of a shuffle service because it means that reducers do
>>>> not need to spill to disk when reading/merging/combining inputs--the data
>>>> can be grouped by the service, even without the service understanding data
>>>> types or byte representations. Alternative (less preferable since it would
>>>> break Java serialization, for example): require all serializers to be
>>>> deterministic.
>>>>
>>>
>>> I really don't understand this one, sorry, can you elaborate more?  I'm
>>> not sure what determinism has to do with spilling to disk.  There is
>>> already supportsRelocationOfSerializedObjects , though that is private,
>>> which seems related but I think you're talking about something else?
>>>
>>> thanks,
>>> Imran
>>>
>>>>
>>
>> --
>> -Ben
>>
>

Re: Enabling fully disaggregated shuffle on Spark

Posted by bo yang <bo...@gmail.com>.
Thanks guys for the discussion in the email and also this afternoon!

From our experience, we do not need to change Spark DAG scheduler to
implement a remote shuffle service. Current Spark shuffle manager
interfaces are pretty good and easy to implement. But we do feel the need
to modify MapStatus to make it more generic.

The current limit with MapStatus is that it assumes* a map output only
exists on a single executor* (see following). One easy update could be
making MapStatus supports the scenario where *a map output could be on
multiple remote servers*.

private[spark] sealed trait MapStatus {
def location: BlockManagerId
}

class BlockManagerId private {
private var executorId_ : String,
private var host_ : String,
private var port_ : Int,
}

Also, MapStatus is a sealed trait, thus our ShuffleManager plugin could not
extend it with our own implementation. How about *making MapStatus a public
non-sealed trait*? So different Shuffle Manager plugin could implement
their own MapStatus classes.

Best,
Bo

On Wed, Dec 4, 2019 at 3:27 PM Ben Sidhom <si...@google.com.invalid> wrote:

> Hey Imran (and everybody who made it to the sync today):
>
> Thanks for the comments. Responses below:
>
> Scheduling and re-executing tasks
>>> Allow coordination between the service and the Spark DAG scheduler as to
>>> whether a given block/partition needs to be recomputed when a task fails or
>>> when shuffle block data cannot be read. Having such coordination is
>>> important, e.g., for suppressing recomputation after aborted executors or
>>> for forcing late recomputation if the service internally acts as a cache.
>>> One catchall solution is to have the shuffle manager provide an indication
>>> of whether shuffle data is external to executors (or nodes). Another
>>> option: allow the shuffle manager (likely on the driver) to be queried for
>>> the existence of shuffle data for a given executor ID (or perhaps map task,
>>> reduce task, etc). Note that this is at the level of data the scheduler is
>>> aware of (i.e., map/reduce partitions) rather than block IDs, which are
>>> internal details for some shuffle managers.
>>
>>
>> sounds reasonable, and I think @Matt Cheah  mentioned something like this
>> has come up with their work on SPARK-25299 and was going to be added even
>> for that work.  (of course, need to look at the actual proposal closely and
>> how it impacts the scheduler.)
>
>
> While this is something that was discussed before, it is not something
> that is *currently* in the scope of SPARK-25299. Given the number of
> parties who are doing async data pushes (either as a backup, as in the case
> of the proposal in SPARK-25299, or as the sole mechanism of data
> distribution), I expect this to be an issue at the forefront for many
> people. I have not yet written a specific proposal for how this should be
> done. Rather, I wanted to gauge how many others see this as an important
> issue and figure out the most reasonable solutions for the community as a
> whole. It sounds like people have been getting by this using hacks so far.
> I would be curious to hear what does and does not work well and which
> solutions we would be OK with in Spark upstream.
>
>
> ShuffleManager API
>>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the
>>> service knows that data is still active. This is one way to enable
>>> time-/job-scoped data because a disaggregated shuffle service cannot rely
>>> on robust communication with Spark and in general has a distinct lifecycle
>>> from the Spark deployment(s) it talks to. This would likely take the form
>>> of a callback on ShuffleManager itself, but there are other approaches.
>>
>>
>
> I believe this can already be done, but maybe its much uglier than it
>> needs to be (though I don't recall the details off the top of my head).
>
>
> As far as I'm aware, this would need to be added out-of-band, e.g., by the
> ShuffleManager itself firing off its own heartbeat thread(s) (on the
> driver, executors, or both). While obviously this is possible, it's also
> prone to leaks and puts more burden on shuffle implementations. In fact, I
> don't have a robust understanding of the lifecycle of the ShuffleManager
> object itself. IIRC (from some ad-hoc tests I did a while back), a new one
> is spawned on each executor itself (as opposed to being instantiated once
> on the driver and deserialized onto executors). If executor
> (ShuffleManager) instances do not receive shutdown hooks, shuffle
> implementations may be prone to resource leaks. Worse, if the behavior of
> ShuffleManager instantiation is not stable between Spark releases, there
> may be correctness issues due to intializers/constructors running in
> unexpected ways. Then you have the ShuffleManager instance used for
> registration. As far as I can tell, this runs on the driver, but might this
> be migrated between machines (either now or in future Spark releases),
> e.g., in cluster mode?
>
> If this were taken care of by the Spark scheduler rather than the shuffle
> manager itself, we could avoid an entire class of subtle issues. My
> off-the-cuff suggestion above was to expose a callback on the
> ShuffleManager that allows implementations to define their own heartbeat
> logic. That could then be invoked by the scheduler when and where
> appropriate (along with any other lifecycle callbacks we might add).
>
> Add lifecycle hooks to shuffle readers and writers (e.g., to close/recycle
>>> connections/streams/file handles as well as provide commit semantics).
>>> SPARK-25299 adds commit semantics to the internal data storage layer, but
>>> this is applicable to all shuffle managers at a higher level and should
>>> apply equally to the ShuffleWriter.
>>
>>
>> ShuffleWriter has a
>>
>>> def stop(success: Boolean): Option[MapStatus]
>>
>>  I would need more info about why that isn't enough.  (But if there is a
>> need for it, yes this makes sense.)
>
>
> That's probably fine for most purposes. However, that stop hook only
> exists on shuffle writers. What about on readers? In any case, each
> instance reader/writer instance appears to only be invoked once for reading
> or writing. If ShuffleManagers can assume that behavior is stable, this
> point is less important. In any case, if we do intend to enable "external"
> shuffle implementations, we should make the APIs as explicit as possible
> and ensure we're enabling cleanup (and commits) wherever possible.
>
> Serialization
>>> Allow serializers to be used more flexibly and efficiently. For example,
>>> have serializers support writing an arbitrary number of objects into an
>>> existing OutputStream or ByteBuffer. This enables objects to be serialized
>>> to direct buffers where doing so makes sense. More importantly, it allows
>>> arbitrary metadata/framing data to be wrapped around individual objects
>>> cheaply. Right now, that’s only possible at the stream level. (There are
>>> hacks around this, but this would enable more idiomatic use in efficient
>>> shuffle implementations.)
>>
>>
>
> I don't really understand how this is different from the existing
>> SerializationStream -- probably a small example would clarify.
>
>
> I illustrated the use case poorly above. It *can* be worked around as of
> now, but not cleanly-and-efficiently (you *can* get one at a time).
> Consider shuffle implementations that do not dump raw stream data to some
> storage service but need to frame serialized objects in some way. They are
> stuck jumping through hoops with the current SerializationStream structure
> (e.g., instantiating a fake/wrapper OutputStream and serializer instance
> for each frame or doing even worse trickery to avoid that allocation
> penalty). If serializers could write to an *existing* byte array
> or---better yet---a ByteBuffer, then this song and dance could be avoided.
>
> I would advocate for ByteBuffers as a first-class data sink as a
> performance optimization. This confers 2 benefits:
>
>    - Users of asynchronous byte channels don't have to copy data between
>    arrays and buffers or give up asynchronicity.
>    - Direct buffers avoid excess data copies and kernel boundary jumps
>    when writing to certain sink
>
> Now that I think about it, this *could *equally benefit the SPARK-25299
> use case where channels are used.
>
> Have serializers indicate whether they are deterministic. This provides
>>> much of the value of a shuffle service because it means that reducers do
>>> not need to spill to disk when reading/merging/combining inputs--the data
>>> can be grouped by the service, even without the service understanding data
>>> types or byte representations. Alternative (less preferable since it would
>>> break Java serialization, for example): require all serializers to be
>>> deterministic.
>>
>>
>
> I really don't understand this one, sorry, can you elaborate more?  I'm
>> not sure what determinism has to do with spilling to disk.  There is
>> already supportsRelocationOfSerializedObjects , though that is private,
>> which seems related but I think you're talking about something else?
>
>
> First off, by deterministic serialization I mean literally that: one
> object (or two objects that are considered equal) will serialize to the
> same byte representation no matter when/how it is serialized. This point is
> about allowing external shuffle/merging services to operate on the
> key/value level without having to actually understand the byte
> representation of objects. Instead of merging *partitions*, shuffle
> managers can merge *data elements*. All of this can be done without
> shipping JVM Comparator functions (i.e., arbitrary code) to shuffle
> services.
>
> There are some dirty hacks/workarounds that can approximate this behavior
> even without strictly deterministic serialization, but we can only
> *guarantee* that shuffle readers (or writers for that matter) do not
> require local disk spill (no more local ExternalSorters) when we're working
> with deterministic serializers and a shuffle service that understands so.
>
> As far as I'm aware, supportsRelocationOfSerializedObjects only means that
> a given object can be moved around within a segment of serialized data.
> (For example, certain object graphs with cycles or other unusual data
> structures can be encoded but impose requirements on data stream ordering.)
> Note that serialized object relocation is a necessary but not sufficient
> condition for deterministic serialization (and spill-free shuffles).
>
>
>
> Anyway, there were a *lot* of people on the call today and we didn't get
> a chance to dig into the nitty-gritty details of these points. I would like
> to know what others think of these (not-fleshed-out) proposals, how they do
> (or do not) work with disaggregated shuffle implementations in the wild,
> and alternative workarounds that people have used so far. I'm particularly
> interested in learning how others have dealt with async writes and data
> reconciliation. Once I have that feedback, I'm happy to put out a more
> focused design doc that we can collect further comments on and iterate.
>
> On Wed, Dec 4, 2019 at 10:58 AM Imran Rashid <ir...@cloudera.com.invalid>
> wrote:
>
>> Hi Ben,
>>
>> in general everything you're proposing sounds reasonable.  For me, at
>> least, I'd need more details on most of the points before I fully
>> understand them, but I'm definitely in favor of the general goal for making
>> spark support fully disaggregated shuffle.  Of course, I also want to make
>> sure it can be done in a way that involves the least risky changes to spark
>> itself and we can continue to support.
>>
>> One very-high level point which I think is worth keeping in mind for the
>> wider community following this -- the key difference between what you are
>> proposing and SPARK-25299, is that SPARK-25299 still uses spark's existing
>> shuffle implementation, which leverages local disk.  Your goal is to better
>> support shuffling all data via some external service, which avoids shuffle
>> data hitting executors local disks entirely.  This was already possible, to
>> some extent, even before SPARK-25299 with the ShuffleManager api; but as
>> you note, there are shortcomings which need to be addressed.  (Historical
>> note: that api wasn't designed with totally distributed shuffle services in
>> mind, it was to support hash- vs. sort-based shuffle, all still on spark's
>> executors.)
>>
>> One thing that I thought you would have needed, but you didn't mention
>> here, is changes to the scheduler to add an extra step between the
>> shuffle-write & shuffle-read stages, if it needs to do any work to
>> reorganize data, I think I have heard this come up in prior discussions.
>>
>> A couple of inline comments below:
>>
>> On Fri, Nov 15, 2019 at 6:10 PM Ben Sidhom <si...@google.com.invalid>
>> wrote:
>>
>>> Proposal
>>> Scheduling and re-executing tasks
>>>
>>> Allow coordination between the service and the Spark DAG scheduler as to
>>> whether a given block/partition needs to be recomputed when a task fails or
>>> when shuffle block data cannot be read. Having such coordination is
>>> important, e.g., for suppressing recomputation after aborted executors or
>>> for forcing late recomputation if the service internally acts as a cache.
>>> One catchall solution is to have the shuffle manager provide an indication
>>> of whether shuffle data is external to executors (or nodes). Another
>>> option: allow the shuffle manager (likely on the driver) to be queried for
>>> the existence of shuffle data for a given executor ID (or perhaps map task,
>>> reduce task, etc). Note that this is at the level of data the scheduler is
>>> aware of (i.e., map/reduce partitions) rather than block IDs, which are
>>> internal details for some shuffle managers.
>>>
>>
>> sounds reasonable, and I think @Matt Cheah <mc...@palantir.com>
>> mentioned something like this has come up with their work on SPARK-25299
>> and was going to be added even for that work.  (of course, need to look at
>> the actual proposal closely and how it impacts the scheduler.)
>>
>>> ShuffleManager API
>>>
>>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the
>>> service knows that data is still active. This is one way to enable
>>> time-/job-scoped data because a disaggregated shuffle service cannot rely
>>> on robust communication with Spark and in general has a distinct lifecycle
>>> from the Spark deployment(s) it talks to. This would likely take the form
>>> of a callback on ShuffleManager itself, but there are other approaches.
>>>
>>
>> I believe this can already be done, but maybe its much uglier than it
>> needs to be (though I don't recall the details off the top of my head).
>>
>>
>>> Add lifecycle hooks to shuffle readers and writers (e.g., to
>>> close/recycle connections/streams/file handles as well as provide commit
>>> semantics). SPARK-25299 adds commit semantics to the internal data storage
>>> layer, but this is applicable to all shuffle managers at a higher level and
>>> should apply equally to the ShuffleWriter.
>>>
>>
>> ShuffleWriter has a
>>
>> def stop(success: Boolean): Option[MapStatus]
>>
>>  I would need more info about why that isn't enough.  (But if there is a
>> need for it, yes this makes sense.)
>>
>>> Serialization
>>>
>>> Allow serializers to be used more flexibly and efficiently. For example,
>>> have serializers support writing an arbitrary number of objects into an
>>> existing OutputStream or ByteBuffer. This enables objects to be serialized
>>> to direct buffers where doing so makes sense. More importantly, it allows
>>> arbitrary metadata/framing data to be wrapped around individual objects
>>> cheaply. Right now, that’s only possible at the stream level. (There are
>>> hacks around this, but this would enable more idiomatic use in efficient
>>> shuffle implementations.)
>>>
>>
>> I don't really understand how this is different from the existing
>> SerializationStream -- probably a small example would clarify.
>>
>>
>>> Have serializers indicate whether they are deterministic. This provides
>>> much of the value of a shuffle service because it means that reducers do
>>> not need to spill to disk when reading/merging/combining inputs--the data
>>> can be grouped by the service, even without the service understanding data
>>> types or byte representations. Alternative (less preferable since it would
>>> break Java serialization, for example): require all serializers to be
>>> deterministic.
>>>
>>
>> I really don't understand this one, sorry, can you elaborate more?  I'm
>> not sure what determinism has to do with spilling to disk.  There is
>> already supportsRelocationOfSerializedObjects , though that is private,
>> which seems related but I think you're talking about something else?
>>
>> thanks,
>> Imran
>>
>>>
>
> --
> -Ben
>

Re: Enabling fully disaggregated shuffle on Spark

Posted by Imran Rashid <ir...@cloudera.com.INVALID>.
> Anyway, there were a *lot* of people on the call today and we didn't get
a chance to dig into the nitty-gritty details of these points. I would like
to know what others think of these (not-fleshed-out) proposals, how they do
(or do not) work with disaggregated shuffle implementations in the wild,
and alternative workarounds that people have used so far. I'm particularly
interested in learning how others have dealt with async writes and data
reconciliation. Once I have that feedback, I'm happy to put out a more
focused design doc that we can collect further comments on and iterate.

yes, I agree, this makes sense -- there are a lot of different topics here
and one email thread will quickly get unwieldy, I think.  While I'm all for
having meetings to discuss things in person, given the number of people &
the timezones, its also helpful to have an async way to discuss this.
Publicly shared google docs seem to be the best option.  Even if we're not
ready for a design doc, a doc collecting use cases & needs would also be
helpful.

thanks for the explanations to my questions, that helps a lot -- I have
some minor follow up questions but that can wait.

On Wed, Dec 4, 2019 at 5:27 PM Ben Sidhom <si...@google.com.invalid> wrote:

> Hey Imran (and everybody who made it to the sync today):
>
> Thanks for the comments. Responses below:
>
> Scheduling and re-executing tasks
>>> Allow coordination between the service and the Spark DAG scheduler as to
>>> whether a given block/partition needs to be recomputed when a task fails or
>>> when shuffle block data cannot be read. Having such coordination is
>>> important, e.g., for suppressing recomputation after aborted executors or
>>> for forcing late recomputation if the service internally acts as a cache.
>>> One catchall solution is to have the shuffle manager provide an indication
>>> of whether shuffle data is external to executors (or nodes). Another
>>> option: allow the shuffle manager (likely on the driver) to be queried for
>>> the existence of shuffle data for a given executor ID (or perhaps map task,
>>> reduce task, etc). Note that this is at the level of data the scheduler is
>>> aware of (i.e., map/reduce partitions) rather than block IDs, which are
>>> internal details for some shuffle managers.
>>
>>
>> sounds reasonable, and I think @Matt Cheah  mentioned something like this
>> has come up with their work on SPARK-25299 and was going to be added even
>> for that work.  (of course, need to look at the actual proposal closely and
>> how it impacts the scheduler.)
>
>
> While this is something that was discussed before, it is not something
> that is *currently* in the scope of SPARK-25299. Given the number of
> parties who are doing async data pushes (either as a backup, as in the case
> of the proposal in SPARK-25299, or as the sole mechanism of data
> distribution), I expect this to be an issue at the forefront for many
> people. I have not yet written a specific proposal for how this should be
> done. Rather, I wanted to gauge how many others see this as an important
> issue and figure out the most reasonable solutions for the community as a
> whole. It sounds like people have been getting by this using hacks so far.
> I would be curious to hear what does and does not work well and which
> solutions we would be OK with in Spark upstream.
>
>
> ShuffleManager API
>>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the
>>> service knows that data is still active. This is one way to enable
>>> time-/job-scoped data because a disaggregated shuffle service cannot rely
>>> on robust communication with Spark and in general has a distinct lifecycle
>>> from the Spark deployment(s) it talks to. This would likely take the form
>>> of a callback on ShuffleManager itself, but there are other approaches.
>>
>>
>
> I believe this can already be done, but maybe its much uglier than it
>> needs to be (though I don't recall the details off the top of my head).
>
>
> As far as I'm aware, this would need to be added out-of-band, e.g., by the
> ShuffleManager itself firing off its own heartbeat thread(s) (on the
> driver, executors, or both). While obviously this is possible, it's also
> prone to leaks and puts more burden on shuffle implementations. In fact, I
> don't have a robust understanding of the lifecycle of the ShuffleManager
> object itself. IIRC (from some ad-hoc tests I did a while back), a new one
> is spawned on each executor itself (as opposed to being instantiated once
> on the driver and deserialized onto executors). If executor
> (ShuffleManager) instances do not receive shutdown hooks, shuffle
> implementations may be prone to resource leaks. Worse, if the behavior of
> ShuffleManager instantiation is not stable between Spark releases, there
> may be correctness issues due to intializers/constructors running in
> unexpected ways. Then you have the ShuffleManager instance used for
> registration. As far as I can tell, this runs on the driver, but might this
> be migrated between machines (either now or in future Spark releases),
> e.g., in cluster mode?
>
> If this were taken care of by the Spark scheduler rather than the shuffle
> manager itself, we could avoid an entire class of subtle issues. My
> off-the-cuff suggestion above was to expose a callback on the
> ShuffleManager that allows implementations to define their own heartbeat
> logic. That could then be invoked by the scheduler when and where
> appropriate (along with any other lifecycle callbacks we might add).
>
> Add lifecycle hooks to shuffle readers and writers (e.g., to close/recycle
>>> connections/streams/file handles as well as provide commit semantics).
>>> SPARK-25299 adds commit semantics to the internal data storage layer, but
>>> this is applicable to all shuffle managers at a higher level and should
>>> apply equally to the ShuffleWriter.
>>
>>
>> ShuffleWriter has a
>>
>>> def stop(success: Boolean): Option[MapStatus]
>>
>>  I would need more info about why that isn't enough.  (But if there is a
>> need for it, yes this makes sense.)
>
>
> That's probably fine for most purposes. However, that stop hook only
> exists on shuffle writers. What about on readers? In any case, each
> instance reader/writer instance appears to only be invoked once for reading
> or writing. If ShuffleManagers can assume that behavior is stable, this
> point is less important. In any case, if we do intend to enable "external"
> shuffle implementations, we should make the APIs as explicit as possible
> and ensure we're enabling cleanup (and commits) wherever possible.
>
> Serialization
>>> Allow serializers to be used more flexibly and efficiently. For example,
>>> have serializers support writing an arbitrary number of objects into an
>>> existing OutputStream or ByteBuffer. This enables objects to be serialized
>>> to direct buffers where doing so makes sense. More importantly, it allows
>>> arbitrary metadata/framing data to be wrapped around individual objects
>>> cheaply. Right now, that’s only possible at the stream level. (There are
>>> hacks around this, but this would enable more idiomatic use in efficient
>>> shuffle implementations.)
>>
>>
>
> I don't really understand how this is different from the existing
>> SerializationStream -- probably a small example would clarify.
>
>
> I illustrated the use case poorly above. It *can* be worked around as of
> now, but not cleanly-and-efficiently (you *can* get one at a time).
> Consider shuffle implementations that do not dump raw stream data to some
> storage service but need to frame serialized objects in some way. They are
> stuck jumping through hoops with the current SerializationStream structure
> (e.g., instantiating a fake/wrapper OutputStream and serializer instance
> for each frame or doing even worse trickery to avoid that allocation
> penalty). If serializers could write to an *existing* byte array
> or---better yet---a ByteBuffer, then this song and dance could be avoided.
>
> I would advocate for ByteBuffers as a first-class data sink as a
> performance optimization. This confers 2 benefits:
>
>    - Users of asynchronous byte channels don't have to copy data between
>    arrays and buffers or give up asynchronicity.
>    - Direct buffers avoid excess data copies and kernel boundary jumps
>    when writing to certain sink
>
> Now that I think about it, this *could *equally benefit the SPARK-25299
> use case where channels are used.
>
> Have serializers indicate whether they are deterministic. This provides
>>> much of the value of a shuffle service because it means that reducers do
>>> not need to spill to disk when reading/merging/combining inputs--the data
>>> can be grouped by the service, even without the service understanding data
>>> types or byte representations. Alternative (less preferable since it would
>>> break Java serialization, for example): require all serializers to be
>>> deterministic.
>>
>>
>
> I really don't understand this one, sorry, can you elaborate more?  I'm
>> not sure what determinism has to do with spilling to disk.  There is
>> already supportsRelocationOfSerializedObjects , though that is private,
>> which seems related but I think you're talking about something else?
>
>
> First off, by deterministic serialization I mean literally that: one
> object (or two objects that are considered equal) will serialize to the
> same byte representation no matter when/how it is serialized. This point is
> about allowing external shuffle/merging services to operate on the
> key/value level without having to actually understand the byte
> representation of objects. Instead of merging *partitions*, shuffle
> managers can merge *data elements*. All of this can be done without
> shipping JVM Comparator functions (i.e., arbitrary code) to shuffle
> services.
>
> There are some dirty hacks/workarounds that can approximate this behavior
> even without strictly deterministic serialization, but we can only
> *guarantee* that shuffle readers (or writers for that matter) do not
> require local disk spill (no more local ExternalSorters) when we're working
> with deterministic serializers and a shuffle service that understands so.
>
> As far as I'm aware, supportsRelocationOfSerializedObjects only means that
> a given object can be moved around within a segment of serialized data.
> (For example, certain object graphs with cycles or other unusual data
> structures can be encoded but impose requirements on data stream ordering.)
> Note that serialized object relocation is a necessary but not sufficient
> condition for deterministic serialization (and spill-free shuffles).
>
>
>
> Anyway, there were a *lot* of people on the call today and we didn't get
> a chance to dig into the nitty-gritty details of these points. I would like
> to know what others think of these (not-fleshed-out) proposals, how they do
> (or do not) work with disaggregated shuffle implementations in the wild,
> and alternative workarounds that people have used so far. I'm particularly
> interested in learning how others have dealt with async writes and data
> reconciliation. Once I have that feedback, I'm happy to put out a more
> focused design doc that we can collect further comments on and iterate.
>
> On Wed, Dec 4, 2019 at 10:58 AM Imran Rashid <ir...@cloudera.com.invalid>
> wrote:
>
>> Hi Ben,
>>
>> in general everything you're proposing sounds reasonable.  For me, at
>> least, I'd need more details on most of the points before I fully
>> understand them, but I'm definitely in favor of the general goal for making
>> spark support fully disaggregated shuffle.  Of course, I also want to make
>> sure it can be done in a way that involves the least risky changes to spark
>> itself and we can continue to support.
>>
>> One very-high level point which I think is worth keeping in mind for the
>> wider community following this -- the key difference between what you are
>> proposing and SPARK-25299, is that SPARK-25299 still uses spark's existing
>> shuffle implementation, which leverages local disk.  Your goal is to better
>> support shuffling all data via some external service, which avoids shuffle
>> data hitting executors local disks entirely.  This was already possible, to
>> some extent, even before SPARK-25299 with the ShuffleManager api; but as
>> you note, there are shortcomings which need to be addressed.  (Historical
>> note: that api wasn't designed with totally distributed shuffle services in
>> mind, it was to support hash- vs. sort-based shuffle, all still on spark's
>> executors.)
>>
>> One thing that I thought you would have needed, but you didn't mention
>> here, is changes to the scheduler to add an extra step between the
>> shuffle-write & shuffle-read stages, if it needs to do any work to
>> reorganize data, I think I have heard this come up in prior discussions.
>>
>> A couple of inline comments below:
>>
>> On Fri, Nov 15, 2019 at 6:10 PM Ben Sidhom <si...@google.com.invalid>
>> wrote:
>>
>>> Proposal
>>> Scheduling and re-executing tasks
>>>
>>> Allow coordination between the service and the Spark DAG scheduler as to
>>> whether a given block/partition needs to be recomputed when a task fails or
>>> when shuffle block data cannot be read. Having such coordination is
>>> important, e.g., for suppressing recomputation after aborted executors or
>>> for forcing late recomputation if the service internally acts as a cache.
>>> One catchall solution is to have the shuffle manager provide an indication
>>> of whether shuffle data is external to executors (or nodes). Another
>>> option: allow the shuffle manager (likely on the driver) to be queried for
>>> the existence of shuffle data for a given executor ID (or perhaps map task,
>>> reduce task, etc). Note that this is at the level of data the scheduler is
>>> aware of (i.e., map/reduce partitions) rather than block IDs, which are
>>> internal details for some shuffle managers.
>>>
>>
>> sounds reasonable, and I think @Matt Cheah <mc...@palantir.com>
>> mentioned something like this has come up with their work on SPARK-25299
>> and was going to be added even for that work.  (of course, need to look at
>> the actual proposal closely and how it impacts the scheduler.)
>>
>>> ShuffleManager API
>>>
>>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the
>>> service knows that data is still active. This is one way to enable
>>> time-/job-scoped data because a disaggregated shuffle service cannot rely
>>> on robust communication with Spark and in general has a distinct lifecycle
>>> from the Spark deployment(s) it talks to. This would likely take the form
>>> of a callback on ShuffleManager itself, but there are other approaches.
>>>
>>
>> I believe this can already be done, but maybe its much uglier than it
>> needs to be (though I don't recall the details off the top of my head).
>>
>>
>>> Add lifecycle hooks to shuffle readers and writers (e.g., to
>>> close/recycle connections/streams/file handles as well as provide commit
>>> semantics). SPARK-25299 adds commit semantics to the internal data storage
>>> layer, but this is applicable to all shuffle managers at a higher level and
>>> should apply equally to the ShuffleWriter.
>>>
>>
>> ShuffleWriter has a
>>
>> def stop(success: Boolean): Option[MapStatus]
>>
>>  I would need more info about why that isn't enough.  (But if there is a
>> need for it, yes this makes sense.)
>>
>>> Serialization
>>>
>>> Allow serializers to be used more flexibly and efficiently. For example,
>>> have serializers support writing an arbitrary number of objects into an
>>> existing OutputStream or ByteBuffer. This enables objects to be serialized
>>> to direct buffers where doing so makes sense. More importantly, it allows
>>> arbitrary metadata/framing data to be wrapped around individual objects
>>> cheaply. Right now, that’s only possible at the stream level. (There are
>>> hacks around this, but this would enable more idiomatic use in efficient
>>> shuffle implementations.)
>>>
>>
>> I don't really understand how this is different from the existing
>> SerializationStream -- probably a small example would clarify.
>>
>>
>>> Have serializers indicate whether they are deterministic. This provides
>>> much of the value of a shuffle service because it means that reducers do
>>> not need to spill to disk when reading/merging/combining inputs--the data
>>> can be grouped by the service, even without the service understanding data
>>> types or byte representations. Alternative (less preferable since it would
>>> break Java serialization, for example): require all serializers to be
>>> deterministic.
>>>
>>
>> I really don't understand this one, sorry, can you elaborate more?  I'm
>> not sure what determinism has to do with spilling to disk.  There is
>> already supportsRelocationOfSerializedObjects , though that is private,
>> which seems related but I think you're talking about something else?
>>
>> thanks,
>> Imran
>>
>>>
>
> --
> -Ben
>

Re: Enabling fully disaggregated shuffle on Spark

Posted by Ben Sidhom <si...@google.com.INVALID>.
Hey Imran (and everybody who made it to the sync today):

Thanks for the comments. Responses below:

Scheduling and re-executing tasks
>> Allow coordination between the service and the Spark DAG scheduler as to
>> whether a given block/partition needs to be recomputed when a task fails or
>> when shuffle block data cannot be read. Having such coordination is
>> important, e.g., for suppressing recomputation after aborted executors or
>> for forcing late recomputation if the service internally acts as a cache.
>> One catchall solution is to have the shuffle manager provide an indication
>> of whether shuffle data is external to executors (or nodes). Another
>> option: allow the shuffle manager (likely on the driver) to be queried for
>> the existence of shuffle data for a given executor ID (or perhaps map task,
>> reduce task, etc). Note that this is at the level of data the scheduler is
>> aware of (i.e., map/reduce partitions) rather than block IDs, which are
>> internal details for some shuffle managers.
>
>
> sounds reasonable, and I think @Matt Cheah  mentioned something like this
> has come up with their work on SPARK-25299 and was going to be added even
> for that work.  (of course, need to look at the actual proposal closely and
> how it impacts the scheduler.)


While this is something that was discussed before, it is not something that
is *currently* in the scope of SPARK-25299. Given the number of parties who
are doing async data pushes (either as a backup, as in the case of the
proposal in SPARK-25299, or as the sole mechanism of data distribution), I
expect this to be an issue at the forefront for many people. I have not yet
written a specific proposal for how this should be done. Rather, I wanted
to gauge how many others see this as an important issue and figure out the
most reasonable solutions for the community as a whole. It sounds like
people have been getting by this using hacks so far. I would be curious to
hear what does and does not work well and which solutions we would be OK
with in Spark upstream.


ShuffleManager API
>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the
>> service knows that data is still active. This is one way to enable
>> time-/job-scoped data because a disaggregated shuffle service cannot rely
>> on robust communication with Spark and in general has a distinct lifecycle
>> from the Spark deployment(s) it talks to. This would likely take the form
>> of a callback on ShuffleManager itself, but there are other approaches.
>
>

I believe this can already be done, but maybe its much uglier than it needs
> to be (though I don't recall the details off the top of my head).


As far as I'm aware, this would need to be added out-of-band, e.g., by the
ShuffleManager itself firing off its own heartbeat thread(s) (on the
driver, executors, or both). While obviously this is possible, it's also
prone to leaks and puts more burden on shuffle implementations. In fact, I
don't have a robust understanding of the lifecycle of the ShuffleManager
object itself. IIRC (from some ad-hoc tests I did a while back), a new one
is spawned on each executor itself (as opposed to being instantiated once
on the driver and deserialized onto executors). If executor
(ShuffleManager) instances do not receive shutdown hooks, shuffle
implementations may be prone to resource leaks. Worse, if the behavior of
ShuffleManager instantiation is not stable between Spark releases, there
may be correctness issues due to intializers/constructors running in
unexpected ways. Then you have the ShuffleManager instance used for
registration. As far as I can tell, this runs on the driver, but might this
be migrated between machines (either now or in future Spark releases),
e.g., in cluster mode?

If this were taken care of by the Spark scheduler rather than the shuffle
manager itself, we could avoid an entire class of subtle issues. My
off-the-cuff suggestion above was to expose a callback on the
ShuffleManager that allows implementations to define their own heartbeat
logic. That could then be invoked by the scheduler when and where
appropriate (along with any other lifecycle callbacks we might add).

Add lifecycle hooks to shuffle readers and writers (e.g., to close/recycle
>> connections/streams/file handles as well as provide commit semantics).
>> SPARK-25299 adds commit semantics to the internal data storage layer, but
>> this is applicable to all shuffle managers at a higher level and should
>> apply equally to the ShuffleWriter.
>
>
> ShuffleWriter has a
>
>> def stop(success: Boolean): Option[MapStatus]
>
>  I would need more info about why that isn't enough.  (But if there is a
> need for it, yes this makes sense.)


That's probably fine for most purposes. However, that stop hook only exists
on shuffle writers. What about on readers? In any case, each instance
reader/writer instance appears to only be invoked once for reading or
writing. If ShuffleManagers can assume that behavior is stable, this point
is less important. In any case, if we do intend to enable "external"
shuffle implementations, we should make the APIs as explicit as possible
and ensure we're enabling cleanup (and commits) wherever possible.

Serialization
>> Allow serializers to be used more flexibly and efficiently. For example,
>> have serializers support writing an arbitrary number of objects into an
>> existing OutputStream or ByteBuffer. This enables objects to be serialized
>> to direct buffers where doing so makes sense. More importantly, it allows
>> arbitrary metadata/framing data to be wrapped around individual objects
>> cheaply. Right now, that’s only possible at the stream level. (There are
>> hacks around this, but this would enable more idiomatic use in efficient
>> shuffle implementations.)
>
>

I don't really understand how this is different from the existing
> SerializationStream -- probably a small example would clarify.


I illustrated the use case poorly above. It *can* be worked around as of
now, but not cleanly-and-efficiently (you *can* get one at a time).
Consider shuffle implementations that do not dump raw stream data to some
storage service but need to frame serialized objects in some way. They are
stuck jumping through hoops with the current SerializationStream structure
(e.g., instantiating a fake/wrapper OutputStream and serializer instance
for each frame or doing even worse trickery to avoid that allocation
penalty). If serializers could write to an *existing* byte array
or---better yet---a ByteBuffer, then this song and dance could be avoided.

I would advocate for ByteBuffers as a first-class data sink as a
performance optimization. This confers 2 benefits:

   - Users of asynchronous byte channels don't have to copy data between
   arrays and buffers or give up asynchronicity.
   - Direct buffers avoid excess data copies and kernel boundary jumps when
   writing to certain sink

Now that I think about it, this *could *equally benefit the SPARK-25299 use
case where channels are used.

Have serializers indicate whether they are deterministic. This provides
>> much of the value of a shuffle service because it means that reducers do
>> not need to spill to disk when reading/merging/combining inputs--the data
>> can be grouped by the service, even without the service understanding data
>> types or byte representations. Alternative (less preferable since it would
>> break Java serialization, for example): require all serializers to be
>> deterministic.
>
>

I really don't understand this one, sorry, can you elaborate more?  I'm not
> sure what determinism has to do with spilling to disk.  There is already
> supportsRelocationOfSerializedObjects , though that is private, which seems
> related but I think you're talking about something else?


First off, by deterministic serialization I mean literally that: one object
(or two objects that are considered equal) will serialize to the same byte
representation no matter when/how it is serialized. This point is about
allowing external shuffle/merging services to operate on the key/value
level without having to actually understand the byte representation of
objects. Instead of merging *partitions*, shuffle managers can merge *data
elements*. All of this can be done without shipping JVM Comparator
functions (i.e., arbitrary code) to shuffle services.

There are some dirty hacks/workarounds that can approximate this behavior
even without strictly deterministic serialization, but we can only
*guarantee* that shuffle readers (or writers for that matter) do not
require local disk spill (no more local ExternalSorters) when we're working
with deterministic serializers and a shuffle service that understands so.

As far as I'm aware, supportsRelocationOfSerializedObjects only means that
a given object can be moved around within a segment of serialized data.
(For example, certain object graphs with cycles or other unusual data
structures can be encoded but impose requirements on data stream ordering.)
Note that serialized object relocation is a necessary but not sufficient
condition for deterministic serialization (and spill-free shuffles).



Anyway, there were a *lot* of people on the call today and we didn't get a
chance to dig into the nitty-gritty details of these points. I would like
to know what others think of these (not-fleshed-out) proposals, how they do
(or do not) work with disaggregated shuffle implementations in the wild,
and alternative workarounds that people have used so far. I'm particularly
interested in learning how others have dealt with async writes and data
reconciliation. Once I have that feedback, I'm happy to put out a more
focused design doc that we can collect further comments on and iterate.

On Wed, Dec 4, 2019 at 10:58 AM Imran Rashid <ir...@cloudera.com.invalid>
wrote:

> Hi Ben,
>
> in general everything you're proposing sounds reasonable.  For me, at
> least, I'd need more details on most of the points before I fully
> understand them, but I'm definitely in favor of the general goal for making
> spark support fully disaggregated shuffle.  Of course, I also want to make
> sure it can be done in a way that involves the least risky changes to spark
> itself and we can continue to support.
>
> One very-high level point which I think is worth keeping in mind for the
> wider community following this -- the key difference between what you are
> proposing and SPARK-25299, is that SPARK-25299 still uses spark's existing
> shuffle implementation, which leverages local disk.  Your goal is to better
> support shuffling all data via some external service, which avoids shuffle
> data hitting executors local disks entirely.  This was already possible, to
> some extent, even before SPARK-25299 with the ShuffleManager api; but as
> you note, there are shortcomings which need to be addressed.  (Historical
> note: that api wasn't designed with totally distributed shuffle services in
> mind, it was to support hash- vs. sort-based shuffle, all still on spark's
> executors.)
>
> One thing that I thought you would have needed, but you didn't mention
> here, is changes to the scheduler to add an extra step between the
> shuffle-write & shuffle-read stages, if it needs to do any work to
> reorganize data, I think I have heard this come up in prior discussions.
>
> A couple of inline comments below:
>
> On Fri, Nov 15, 2019 at 6:10 PM Ben Sidhom <si...@google.com.invalid>
> wrote:
>
>> Proposal
>> Scheduling and re-executing tasks
>>
>> Allow coordination between the service and the Spark DAG scheduler as to
>> whether a given block/partition needs to be recomputed when a task fails or
>> when shuffle block data cannot be read. Having such coordination is
>> important, e.g., for suppressing recomputation after aborted executors or
>> for forcing late recomputation if the service internally acts as a cache.
>> One catchall solution is to have the shuffle manager provide an indication
>> of whether shuffle data is external to executors (or nodes). Another
>> option: allow the shuffle manager (likely on the driver) to be queried for
>> the existence of shuffle data for a given executor ID (or perhaps map task,
>> reduce task, etc). Note that this is at the level of data the scheduler is
>> aware of (i.e., map/reduce partitions) rather than block IDs, which are
>> internal details for some shuffle managers.
>>
>
> sounds reasonable, and I think @Matt Cheah <mc...@palantir.com>
> mentioned something like this has come up with their work on SPARK-25299
> and was going to be added even for that work.  (of course, need to look at
> the actual proposal closely and how it impacts the scheduler.)
>
>> ShuffleManager API
>>
>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the
>> service knows that data is still active. This is one way to enable
>> time-/job-scoped data because a disaggregated shuffle service cannot rely
>> on robust communication with Spark and in general has a distinct lifecycle
>> from the Spark deployment(s) it talks to. This would likely take the form
>> of a callback on ShuffleManager itself, but there are other approaches.
>>
>
> I believe this can already be done, but maybe its much uglier than it
> needs to be (though I don't recall the details off the top of my head).
>
>
>> Add lifecycle hooks to shuffle readers and writers (e.g., to
>> close/recycle connections/streams/file handles as well as provide commit
>> semantics). SPARK-25299 adds commit semantics to the internal data storage
>> layer, but this is applicable to all shuffle managers at a higher level and
>> should apply equally to the ShuffleWriter.
>>
>
> ShuffleWriter has a
>
> def stop(success: Boolean): Option[MapStatus]
>
>  I would need more info about why that isn't enough.  (But if there is a
> need for it, yes this makes sense.)
>
>> Serialization
>>
>> Allow serializers to be used more flexibly and efficiently. For example,
>> have serializers support writing an arbitrary number of objects into an
>> existing OutputStream or ByteBuffer. This enables objects to be serialized
>> to direct buffers where doing so makes sense. More importantly, it allows
>> arbitrary metadata/framing data to be wrapped around individual objects
>> cheaply. Right now, that’s only possible at the stream level. (There are
>> hacks around this, but this would enable more idiomatic use in efficient
>> shuffle implementations.)
>>
>
> I don't really understand how this is different from the existing
> SerializationStream -- probably a small example would clarify.
>
>
>> Have serializers indicate whether they are deterministic. This provides
>> much of the value of a shuffle service because it means that reducers do
>> not need to spill to disk when reading/merging/combining inputs--the data
>> can be grouped by the service, even without the service understanding data
>> types or byte representations. Alternative (less preferable since it would
>> break Java serialization, for example): require all serializers to be
>> deterministic.
>>
>
> I really don't understand this one, sorry, can you elaborate more?  I'm
> not sure what determinism has to do with spilling to disk.  There is
> already supportsRelocationOfSerializedObjects , though that is private,
> which seems related but I think you're talking about something else?
>
> thanks,
> Imran
>
>>

-- 
-Ben

Re: Enabling fully disaggregated shuffle on Spark

Posted by Imran Rashid <ir...@cloudera.com.INVALID>.
Hi Ben,

in general everything you're proposing sounds reasonable.  For me, at
least, I'd need more details on most of the points before I fully
understand them, but I'm definitely in favor of the general goal for making
spark support fully disaggregated shuffle.  Of course, I also want to make
sure it can be done in a way that involves the least risky changes to spark
itself and we can continue to support.

One very-high level point which I think is worth keeping in mind for the
wider community following this -- the key difference between what you are
proposing and SPARK-25299, is that SPARK-25299 still uses spark's existing
shuffle implementation, which leverages local disk.  Your goal is to better
support shuffling all data via some external service, which avoids shuffle
data hitting executors local disks entirely.  This was already possible, to
some extent, even before SPARK-25299 with the ShuffleManager api; but as
you note, there are shortcomings which need to be addressed.  (Historical
note: that api wasn't designed with totally distributed shuffle services in
mind, it was to support hash- vs. sort-based shuffle, all still on spark's
executors.)

One thing that I thought you would have needed, but you didn't mention
here, is changes to the scheduler to add an extra step between the
shuffle-write & shuffle-read stages, if it needs to do any work to
reorganize data, I think I have heard this come up in prior discussions.

A couple of inline comments below:

On Fri, Nov 15, 2019 at 6:10 PM Ben Sidhom <si...@google.com.invalid>
wrote:

> Proposal
> Scheduling and re-executing tasks
>
> Allow coordination between the service and the Spark DAG scheduler as to
> whether a given block/partition needs to be recomputed when a task fails or
> when shuffle block data cannot be read. Having such coordination is
> important, e.g., for suppressing recomputation after aborted executors or
> for forcing late recomputation if the service internally acts as a cache.
> One catchall solution is to have the shuffle manager provide an indication
> of whether shuffle data is external to executors (or nodes). Another
> option: allow the shuffle manager (likely on the driver) to be queried for
> the existence of shuffle data for a given executor ID (or perhaps map task,
> reduce task, etc). Note that this is at the level of data the scheduler is
> aware of (i.e., map/reduce partitions) rather than block IDs, which are
> internal details for some shuffle managers.
>

sounds reasonable, and I think @Matt Cheah <mc...@palantir.com>  mentioned
something like this has come up with their work on SPARK-25299 and was
going to be added even for that work.  (of course, need to look at the
actual proposal closely and how it impacts the scheduler.)

> ShuffleManager API
>
> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the
> service knows that data is still active. This is one way to enable
> time-/job-scoped data because a disaggregated shuffle service cannot rely
> on robust communication with Spark and in general has a distinct lifecycle
> from the Spark deployment(s) it talks to. This would likely take the form
> of a callback on ShuffleManager itself, but there are other approaches.
>

I believe this can already be done, but maybe its much uglier than it needs
to be (though I don't recall the details off the top of my head).


> Add lifecycle hooks to shuffle readers and writers (e.g., to close/recycle
> connections/streams/file handles as well as provide commit semantics).
> SPARK-25299 adds commit semantics to the internal data storage layer, but
> this is applicable to all shuffle managers at a higher level and should
> apply equally to the ShuffleWriter.
>

ShuffleWriter has a

def stop(success: Boolean): Option[MapStatus]

 I would need more info about why that isn't enough.  (But if there is a
need for it, yes this makes sense.)

> Serialization
>
> Allow serializers to be used more flexibly and efficiently. For example,
> have serializers support writing an arbitrary number of objects into an
> existing OutputStream or ByteBuffer. This enables objects to be serialized
> to direct buffers where doing so makes sense. More importantly, it allows
> arbitrary metadata/framing data to be wrapped around individual objects
> cheaply. Right now, that’s only possible at the stream level. (There are
> hacks around this, but this would enable more idiomatic use in efficient
> shuffle implementations.)
>

I don't really understand how this is different from the existing
SerializationStream -- probably a small example would clarify.


> Have serializers indicate whether they are deterministic. This provides
> much of the value of a shuffle service because it means that reducers do
> not need to spill to disk when reading/merging/combining inputs--the data
> can be grouped by the service, even without the service understanding data
> types or byte representations. Alternative (less preferable since it would
> break Java serialization, for example): require all serializers to be
> deterministic.
>

I really don't understand this one, sorry, can you elaborate more?  I'm not
sure what determinism has to do with spilling to disk.  There is
already supportsRelocationOfSerializedObjects , though that is private,
which seems related but I think you're talking about something else?

thanks,
Imran

>