You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Greg Lee <li...@gmail.com> on 2019/12/02 15:33:24 UTC

Re: Enabling fully disaggregated shuffle on Spark

Hi Felix & Ben,

This is Li Hao from Baidu, same team with Linhong.

As mentioned in Linhong’s email, independent disaggregated shuffle service
is also our solution and continuous exploring direction for  improving
stability of Hadoop MR and Spark in the production environment. We would
love to hear about this topic from community and share our experience .

Please add me to this event, thanks.

Best Regards
Li Hao

Liu,Linhong <li...@baidu.com> 于2019年11月29日周五 下午5:09写道:

> Hi Felix & Ben,
>
> This is Linhong from Baidu based in Beijing, and we are internally using a
> disaggregated shuffle service (we call it DCE) as well. We launched this in
> production 3 years ago for Hadoop shuffle. Last year we migrated spark
> shuffle to the same DCE shuffle service and stability improved a lot (we
> can handle more than 100T shuffle now).
>
> It would be nice if there is a Spark shuffle API support fully
> disaggregated shuffle and my team and I are very glad to share our
> experience and help on this topic.
>
> So, if It’s possible, please add me to this event.
>
>
>
> Thanks,
>
> Liu, Linhong
>
>
>
> *From: *Aniket Mokashi <an...@gmail.com>
> *Date: *Thursday, November 21, 2019 at 2:12 PM
> *To: *Felix Cheung <fe...@hotmail.com>
> *Cc: *Ben Sidhom <si...@google.com.invalid>, John Zhuge <
> jzhuge@apache.org>, bo yang <bo...@gmail.com>, Amogh Margoor <
> amoghm@qubole.com>, Ryan Blue <rb...@netflix.com>, Spark Dev List <
> dev@spark.apache.org>, Christopher Crosbie <cr...@google.com>,
> Griselda Cuevas <gr...@google.com>, Holden Karau <ho...@pigscanfly.ca>,
> Mayank Ahuja <ma...@qubole.com>, Kalyan Sivakumar <ka...@qubole.com>, "
> alfozan@fb.com" <al...@fb.com>, Felix Cheung <fe...@uber.com>, Matt
> Cheah <mc...@palantir.com>, "Yifei Huang (PD)" <yi...@palantir.com>
> *Subject: *Re: Enabling fully disaggregated shuffle on Spark
>
>
>
> Felix - please add me to this event.
>
>
>
> Ben - should we move this proposal to a doc and open it up for
> edits/comments.
>
>
>
> On Wed, Nov 20, 2019 at 5:37 PM Felix Cheung <fe...@hotmail.com>
> wrote:
>
> Great!
>
>
>
> Due to number of constraints I won’t be sending link directly here but
> please r me and I will add you.
>
>
>
>
> ------------------------------
>
> *From:* Ben Sidhom <si...@google.com.INVALID>
> *Sent:* Wednesday, November 20, 2019 9:10:01 AM
> *To:* John Zhuge <jz...@apache.org>
> *Cc:* bo yang <bo...@gmail.com>; Amogh Margoor <am...@qubole.com>;
> Ryan Blue <rb...@netflix.com>; Ben Sidhom <si...@google.com.invalid>;
> Spark Dev List <de...@spark.apache.org>; Christopher Crosbie <
> crosbiec@google.com>; Griselda Cuevas <gr...@google.com>; Holden Karau <
> holden@pigscanfly.ca>; Mayank Ahuja <ma...@qubole.com>; Kalyan Sivakumar
> <ka...@qubole.com>; alfozan@fb.com <al...@fb.com>; Felix Cheung <
> felixc@uber.com>; Matt Cheah <mc...@palantir.com>; Yifei Huang (PD) <
> yifeih@palantir.com>
> *Subject:* Re: Enabling fully disaggregated shuffle on Spark
>
>
>
> That sounds great!
>
>
>
> On Wed, Nov 20, 2019 at 9:02 AM John Zhuge <jz...@apache.org> wrote:
>
> That will be great. Please send us the invite.
>
>
>
> On Wed, Nov 20, 2019 at 8:56 AM bo yang <bo...@gmail.com> wrote:
>
> Cool, thanks Ryan, John, Amogh for the reply! Great to see you interested!
> Felix will have a Spark Scalability & Reliability Sync meeting on Dec 4 1pm
> PST. We could discuss more details there. Do you want to join?
>
>
>
> On Tue, Nov 19, 2019 at 4:23 PM Amogh Margoor <am...@qubole.com> wrote:
>
> We at Qubole are also looking at disaggregating shuffle on Spark. Would
> love to collaborate and share learnings.
>
>
>
> Regards,
>
> Amogh
>
>
>
> On Tue, Nov 19, 2019 at 4:09 PM John Zhuge <jz...@apache.org> wrote:
>
> Great work, Bo! Would love to hear the details.
>
>
>
>
>
> On Tue, Nov 19, 2019 at 4:05 PM Ryan Blue <rb...@netflix.com.invalid>
> wrote:
>
> I'm interested in remote shuffle services as well. I'd love to hear about
> what you're using in production!
>
>
>
> rb
>
>
>
> On Tue, Nov 19, 2019 at 2:43 PM bo yang <bo...@gmail.com> wrote:
>
> Hi Ben,
>
>
>
> Thanks for the writing up! This is Bo from Uber. I am in Felix's team in
> Seattle, and working on disaggregated shuffle (we called it remote shuffle
> service, RSS, internally). We have put RSS into production for a while, and
> learned a lot during the work (tried quite a few techniques to improve the
> remote shuffle performance). We could share our learning with the
> community, and also would like to hear feedback/suggestions on how to
> further improve remote shuffle performance. We could chat more details if
> you or other people are interested.
>
>
>
> Best,
>
> Bo
>
>
>
> On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <si...@google.com.invalid>
> wrote:
>
> I would like to start a conversation about extending the Spark shuffle
> manager surface to support fully disaggregated shuffle implementations.
> This is closely related to the work in SPARK-25299
> <https://issues.apache.org/jira/browse/SPARK-25299>, which is focused on
> refactoring the shuffle manager API (and in particular, SortShuffleManager)
> to use a pluggable storage backend. The motivation for that SPIP is further
> enabling Spark on Kubernetes.
>
>
>
> The motivation for this proposal is enabling full externalized
> (disaggregated) shuffle service implementations. (Facebook’s Cosco shuffle
> <https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service>
> is one example of such a disaggregated shuffle service.) These changes
> allow the bulk of the shuffle to run in a remote service such that minimal
> state resides in executors and local disk spill is minimized. The net
> effect is increased job stability and performance improvements in certain
> scenarios. These changes should work well with or are complementary to
> SPARK-25299. Some or all points may be merged into that issue as
> appropriate.
>
>
>
> Below is a description of each component of this proposal. These changes
> can ideally be introduced incrementally. I would like to gather feedback
> and gauge interest from others in the community to collaborate on this.
> There are likely more points that would  be useful to disaggregated shuffle
> services. We can outline a more concrete plan after gathering enough input.
> A working session could help us kick off this joint effort; maybe something
> in the mid-January to mid-February timeframe (depending on interest and
> availability. I’m happy to host at our Sunnyvale, CA offices.
>
>
> Proposal Scheduling and re-executing tasks
>
> Allow coordination between the service and the Spark DAG scheduler as to
> whether a given block/partition needs to be recomputed when a task fails or
> when shuffle block data cannot be read. Having such coordination is
> important, e.g., for suppressing recomputation after aborted executors or
> for forcing late recomputation if the service internally acts as a cache.
> One catchall solution is to have the shuffle manager provide an indication
> of whether shuffle data is external to executors (or nodes). Another
> option: allow the shuffle manager (likely on the driver) to be queried for
> the existence of shuffle data for a given executor ID (or perhaps map task,
> reduce task, etc). Note that this is at the level of data the scheduler is
> aware of (i.e., map/reduce partitions) rather than block IDs, which are
> internal details for some shuffle managers.
> ShuffleManager API
>
> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the
> service knows that data is still active. This is one way to enable
> time-/job-scoped data because a disaggregated shuffle service cannot rely
> on robust communication with Spark and in general has a distinct lifecycle
> from the Spark deployment(s) it talks to. This would likely take the form
> of a callback on ShuffleManager itself, but there are other approaches.
>
>
>
> Add lifecycle hooks to shuffle readers and writers (e.g., to close/recycle
> connections/streams/file handles as well as provide commit semantics).
> SPARK-25299 adds commit semantics to the internal data storage layer, but
> this is applicable to all shuffle managers at a higher level and should
> apply equally to the ShuffleWriter.
>
>
>
> Do not require ShuffleManagers to expose ShuffleBlockResolvers where they
> are not needed. Ideally, this would be an implementation detail of the
> shuffle manager itself. If there is substantial overlap between the
> SortShuffleManager and other implementations, then the storage details can
> be abstracted at the appropriate level. (SPARK-25299 does not currently
> change this.)
>
>
>
> Do not require MapStatus to include blockmanager IDs where they are not
> relevant. This is captured by ShuffleBlockInfo
> <https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit#heading=h.imi27prnziyj>
> including an *optional* BlockManagerId in SPARK-25299. However, this
> change should be lifted to the MapStatus level so that it applies to all
> ShuffleManagers. Alternatively, use a more general data-location
> abstraction than BlockManagerId. This gives the shuffle manager more
> flexibility and the scheduler more information with respect to data
> residence.
> Serialization
>
> Allow serializers to be used more flexibly and efficiently. For example,
> have serializers support writing an arbitrary number of objects into an
> existing OutputStream or ByteBuffer. This enables objects to be serialized
> to direct buffers where doing so makes sense. More importantly, it allows
> arbitrary metadata/framing data to be wrapped around individual objects
> cheaply. Right now, that’s only possible at the stream level. (There are
> hacks around this, but this would enable more idiomatic use in efficient
> shuffle implementations.)
>
>
>
> Have serializers indicate whether they are deterministic. This provides
> much of the value of a shuffle service because it means that reducers do
> not need to spill to disk when reading/merging/combining inputs--the data
> can be grouped by the service, even without the service understanding data
> types or byte representations. Alternative (less preferable since it would
> break Java serialization, for example): require all serializers to be
> deterministic.
>
>
>
>
>
> --
>
> - Ben
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>
>
>
>
> --
>
> John Zhuge
>
>
>
>
> --
>
> John Zhuge
>
>
>
>
> --
>
> -Ben
>
>
>
>
> --
>
> "...:::Aniket:::... Quetzalco@tl"
>

RE: Enabling fully disaggregated shuffle on Spark

Posted by "Jia, Ke A" <ke...@intel.com>.
Hi Ben and Felix,
This is Jia Ke from Intel Big Data Team. And I'm also interested in this. Would you please add me to the invite, thanks a lot.

Best regards,
Jia Ke
From: Qi,He <qi...@baidu.com>
Sent: Thursday, December 05, 2019 11:12 AM
To: Saisai Shao <sa...@gmail.com>
Cc: Liu,Linhong <li...@baidu.com>; Aniket Mokashi <an...@gmail.com>; Felix Cheung <fe...@hotmail.com>; Ben Sidhom <si...@google.com.invalid>; John Zhuge <jz...@apache.org>; bo yang <bo...@gmail.com>; Amogh Margoor <am...@qubole.com>; Ryan Blue <rb...@netflix.com>; Spark Dev List <de...@spark.apache.org>; Christopher Crosbie <cr...@google.com>; Griselda Cuevas <gr...@google.com>; Holden Karau <ho...@pigscanfly.ca>; Mayank Ahuja <ma...@qubole.com>; Kalyan Sivakumar <ka...@qubole.com>; alfozan@fb.com; Felix Cheung <fe...@uber.com>; Matt Cheah <mc...@palantir.com>; Yifei Huang (PD) <yi...@palantir.com>
Subject: Re: Enabling fully disaggregated shuffle on Spark

Hi Ben and Felix

This is Qi He from Baidu,same team with Linhong,I’m also interested in this. Would you please add me to the invite, thanks a lot.

Thanks
Qi, He

发件人: Saisai Shao <sa...@gmail.com>>
日期: 2019年12月4日 星期三 下午5:57
至: Greg Lee <li...@gmail.com>>
抄送: "Liu,Linhong" <li...@baidu.com>>, Aniket Mokashi <an...@gmail.com>>, Felix Cheung <fe...@hotmail.com>>, Ben Sidhom <si...@google.com.invalid>>, John Zhuge <jz...@apache.org>>, bo yang <bo...@gmail.com>>, Amogh Margoor <am...@qubole.com>>, Ryan Blue <rb...@netflix.com>>, Spark Dev List <de...@spark.apache.org>>, Christopher Crosbie <cr...@google.com>>, Griselda Cuevas <gr...@google.com>>, Holden Karau <ho...@pigscanfly.ca>>, Mayank Ahuja <ma...@qubole.com>>, Kalyan Sivakumar <ka...@qubole.com>>, "alfozan@fb.com<ma...@fb.com>" <al...@fb.com>>, Felix Cheung <fe...@uber.com>>, Matt Cheah <mc...@palantir.com>>, "Yifei Huang (PD)" <yi...@palantir.com>>
主题: Re: Enabling fully disaggregated shuffle on Spark

Hi Ben and Felix, I'm also interested in this. Would you please add me to the invite, thanks a lot.

Best regards,
Saisai

Greg Lee <li...@gmail.com>> 于2019年12月2日周一 下午11:34写道:
Hi Felix & Ben,

This is Li Hao from Baidu, same team with Linhong.

As mentioned in Linhong’s email, independent disaggregated shuffle service is also our solution and continuous exploring direction for  improving stability of Hadoop MR and Spark in the production environment. We would love to hear about this topic from community and share our experience .

Please add me to this event, thanks.

Best Regards
Li Hao

Liu,Linhong <li...@baidu.com>> 于2019年11月29日周五 下午5:09写道:
Hi Felix & Ben,
This is Linhong from Baidu based in Beijing, and we are internally using a disaggregated shuffle service (we call it DCE) as well. We launched this in production 3 years ago for Hadoop shuffle. Last year we migrated spark shuffle to the same DCE shuffle service and stability improved a lot (we can handle more than 100T shuffle now).
It would be nice if there is a Spark shuffle API support fully disaggregated shuffle and my team and I are very glad to share our experience and help on this topic.
So, if It’s possible, please add me to this event.

Thanks,
Liu, Linhong

From: Aniket Mokashi <an...@gmail.com>>
Date: Thursday, November 21, 2019 at 2:12 PM
To: Felix Cheung <fe...@hotmail.com>>
Cc: Ben Sidhom <si...@google.com.invalid>>, John Zhuge <jz...@apache.org>>, bo yang <bo...@gmail.com>>, Amogh Margoor <am...@qubole.com>>, Ryan Blue <rb...@netflix.com>>, Spark Dev List <de...@spark.apache.org>>, Christopher Crosbie <cr...@google.com>>, Griselda Cuevas <gr...@google.com>>, Holden Karau <ho...@pigscanfly.ca>>, Mayank Ahuja <ma...@qubole.com>>, Kalyan Sivakumar <ka...@qubole.com>>, "alfozan@fb.com<ma...@fb.com>" <al...@fb.com>>, Felix Cheung <fe...@uber.com>>, Matt Cheah <mc...@palantir.com>>, "Yifei Huang (PD)" <yi...@palantir.com>>
Subject: Re: Enabling fully disaggregated shuffle on Spark

Felix - please add me to this event.

Ben - should we move this proposal to a doc and open it up for edits/comments.

On Wed, Nov 20, 2019 at 5:37 PM Felix Cheung <fe...@hotmail.com>> wrote:
Great!

Due to number of constraints I won’t be sending link directly here but please r me and I will add you.


________________________________
From: Ben Sidhom <si...@google.com.INVALID>>
Sent: Wednesday, November 20, 2019 9:10:01 AM
To: John Zhuge <jz...@apache.org>>
Cc: bo yang <bo...@gmail.com>>; Amogh Margoor <am...@qubole.com>>; Ryan Blue <rb...@netflix.com>>; Ben Sidhom <si...@google.com.invalid>>; Spark Dev List <de...@spark.apache.org>>; Christopher Crosbie <cr...@google.com>>; Griselda Cuevas <gr...@google.com>>; Holden Karau <ho...@pigscanfly.ca>>; Mayank Ahuja <ma...@qubole.com>>; Kalyan Sivakumar <ka...@qubole.com>>; alfozan@fb.com<ma...@fb.com> <al...@fb.com>>; Felix Cheung <fe...@uber.com>>; Matt Cheah <mc...@palantir.com>>; Yifei Huang (PD) <yi...@palantir.com>>
Subject: Re: Enabling fully disaggregated shuffle on Spark

That sounds great!

On Wed, Nov 20, 2019 at 9:02 AM John Zhuge <jz...@apache.org>> wrote:
That will be great. Please send us the invite.

On Wed, Nov 20, 2019 at 8:56 AM bo yang <bo...@gmail.com>> wrote:
Cool, thanks Ryan, John, Amogh for the reply! Great to see you interested! Felix will have a Spark Scalability & Reliability Sync meeting on Dec 4 1pm PST. We could discuss more details there. Do you want to join?

On Tue, Nov 19, 2019 at 4:23 PM Amogh Margoor <am...@qubole.com>> wrote:
We at Qubole are also looking at disaggregating shuffle on Spark. Would love to collaborate and share learnings.

Regards,
Amogh

On Tue, Nov 19, 2019 at 4:09 PM John Zhuge <jz...@apache.org>> wrote:
Great work, Bo! Would love to hear the details.


On Tue, Nov 19, 2019 at 4:05 PM Ryan Blue <rb...@netflix.com.invalid>> wrote:
I'm interested in remote shuffle services as well. I'd love to hear about what you're using in production!

rb

On Tue, Nov 19, 2019 at 2:43 PM bo yang <bo...@gmail.com>> wrote:
Hi Ben,

Thanks for the writing up! This is Bo from Uber. I am in Felix's team in Seattle, and working on disaggregated shuffle (we called it remote shuffle service, RSS, internally). We have put RSS into production for a while, and learned a lot during the work (tried quite a few techniques to improve the remote shuffle performance). We could share our learning with the community, and also would like to hear feedback/suggestions on how to further improve remote shuffle performance. We could chat more details if you or other people are interested.

Best,
Bo

On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <si...@google.com.invalid>> wrote:

I would like to start a conversation about extending the Spark shuffle manager surface to support fully disaggregated shuffle implementations. This is closely related to the work inSPARK-25299<https://issues.apache.org/jira/browse/SPARK-25299>, which is focused on refactoring the shuffle manager API (and in particular, SortShuffleManager) to use a pluggable storage backend. The motivation for that SPIP is further enabling Spark on Kubernetes.



The motivation for this proposal is enabling full externalized (disaggregated) shuffle service implementations. (Facebook’s Cosco shuffle<https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service> is one example of such a disaggregated shuffle service.) These changes allow the bulk of the shuffle to run in a remote service such that minimal state resides in executors and local disk spill is minimized. The net effect is increased job stability and performance improvements in certain scenarios. These changes should work well with or are complementary to SPARK-25299. Some or all points may be merged into that issue as appropriate.



Below is a description of each component of this proposal. These changes can ideally be introduced incrementally. I would like to gather feedback and gauge interest from others in the community to collaborate on this. There are likely more points that would  be useful to disaggregated shuffle services. We can outline a more concrete plan after gathering enough input. A working session could help us kick off this joint effort; maybe something in the mid-January to mid-February timeframe (depending on interest and availability. I’m happy to host at our Sunnyvale, CA offices.



Proposal
Scheduling and re-executing tasks

Allow coordination between the service and the Spark DAG scheduler as to whether a given block/partition needs to be recomputed when a task fails or when shuffle block data cannot be read. Having such coordination is important, e.g., for suppressing recomputation after aborted executors or for forcing late recomputation if the service internally acts as a cache. One catchall solution is to have the shuffle manager provide an indication of whether shuffle data is external to executors (or nodes). Another option: allow the shuffle manager (likely on the driver) to be queried for the existence of shuffle data for a given executor ID (or perhaps map task, reduce task, etc). Note that this is at the level of data the scheduler is aware of (i.e., map/reduce partitions) rather than block IDs, which are internal details for some shuffle managers.

ShuffleManager API

Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the service knows that data is still active. This is one way to enable time-/job-scoped data because a disaggregated shuffle service cannot rely on robust communication with Spark and in general has a distinct lifecycle from the Spark deployment(s) it talks to. This would likely take the form of a callback on ShuffleManager itself, but there are other approaches.



Add lifecycle hooks to shuffle readers and writers (e.g., to close/recycle connections/streams/file handles as well as provide commit semantics). SPARK-25299 adds commit semantics to the internal data storage layer, but this is applicable to all shuffle managers at a higher level and should apply equally to the ShuffleWriter.



Do not require ShuffleManagers to expose ShuffleBlockResolvers where they are not needed. Ideally, this would be an implementation detail of the shuffle manager itself. If there is substantial overlap between the SortShuffleManager and other implementations, then the storage details can be abstracted at the appropriate level. (SPARK-25299 does not currently change this.)



Do not require MapStatus to include blockmanager IDs where they are not relevant. This is captured byShuffleBlockInfo<https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit#heading=h.imi27prnziyj> including an optional BlockManagerId in SPARK-25299. However, this change should be lifted to the MapStatus level so that it applies to all ShuffleManagers. Alternatively, use a more general data-location abstraction than BlockManagerId. This gives the shuffle manager more flexibility and the scheduler more information with respect to data residence.

Serialization

Allow serializers to be used more flexibly and efficiently. For example, have serializers support writing an arbitrary number of objects into an existing OutputStream or ByteBuffer. This enables objects to be serialized to direct buffers where doing so makes sense. More importantly, it allows arbitrary metadata/framing data to be wrapped around individual objects cheaply. Right now, that’s only possible at the stream level. (There are hacks around this, but this would enable more idiomatic use in efficient shuffle implementations.)



Have serializers indicate whether they are deterministic. This provides much of the value of a shuffle service because it means that reducers do not need to spill to disk when reading/merging/combining inputs--the data can be grouped by the service, even without the service understanding data types or byte representations. Alternative (less preferable since it would break Java serialization, for example): require all serializers to be deterministic.





--

- Ben


--
Ryan Blue
Software Engineer
Netflix


--
John Zhuge


--
John Zhuge


--
-Ben


--
"...:::Aniket:::... Quetzalco@tl"

Re: Enabling fully disaggregated shuffle on Spark

Posted by "Qi,He" <qi...@baidu.com>.
Hi Ben and Felix

This is Qi He from Baidu,same team with Linhong,I’m also interested in this. Would you please add me to the invite, thanks a lot.

Thanks
Qi, He

发件人: Saisai Shao <sa...@gmail.com>>
日期: 2019年12月4日 星期三 下午5:57
至: Greg Lee <li...@gmail.com>>
抄送: "Liu,Linhong" <li...@baidu.com>>, Aniket Mokashi <an...@gmail.com>>, Felix Cheung <fe...@hotmail.com>>, Ben Sidhom <si...@google.com.invalid>>, John Zhuge <jz...@apache.org>>, bo yang <bo...@gmail.com>>, Amogh Margoor <am...@qubole.com>>, Ryan Blue <rb...@netflix.com>>, Spark Dev List <de...@spark.apache.org>>, Christopher Crosbie <cr...@google.com>>, Griselda Cuevas <gr...@google.com>>, Holden Karau <ho...@pigscanfly.ca>>, Mayank Ahuja <ma...@qubole.com>>, Kalyan Sivakumar <ka...@qubole.com>>, "alfozan@fb.com<ma...@fb.com>" <al...@fb.com>>, Felix Cheung <fe...@uber.com>>, Matt Cheah <mc...@palantir.com>>, "Yifei Huang (PD)" <yi...@palantir.com>>
主题: Re: Enabling fully disaggregated shuffle on Spark

Hi Ben and Felix, I'm also interested in this. Would you please add me to the invite, thanks a lot.

Best regards,
Saisai

Greg Lee <li...@gmail.com>> 于2019年12月2日周一 下午11:34写道:
Hi Felix & Ben,

This is Li Hao from Baidu, same team with Linhong.

As mentioned in Linhong’s email, independent disaggregated shuffle service is also our solution and continuous exploring direction for  improving stability of Hadoop MR and Spark in the production environment. We would love to hear about this topic from community and share our experience .

Please add me to this event, thanks.

Best Regards
Li Hao

Liu,Linhong <li...@baidu.com>> 于2019年11月29日周五 下午5:09写道:
Hi Felix & Ben,
This is Linhong from Baidu based in Beijing, and we are internally using a disaggregated shuffle service (we call it DCE) as well. We launched this in production 3 years ago for Hadoop shuffle. Last year we migrated spark shuffle to the same DCE shuffle service and stability improved a lot (we can handle more than 100T shuffle now).
It would be nice if there is a Spark shuffle API support fully disaggregated shuffle and my team and I are very glad to share our experience and help on this topic.
So, if It’s possible, please add me to this event.

Thanks,
Liu, Linhong

From: Aniket Mokashi <an...@gmail.com>>
Date: Thursday, November 21, 2019 at 2:12 PM
To: Felix Cheung <fe...@hotmail.com>>
Cc: Ben Sidhom <si...@google.com.invalid>>, John Zhuge <jz...@apache.org>>, bo yang <bo...@gmail.com>>, Amogh Margoor <am...@qubole.com>>, Ryan Blue <rb...@netflix.com>>, Spark Dev List <de...@spark.apache.org>>, Christopher Crosbie <cr...@google.com>>, Griselda Cuevas <gr...@google.com>>, Holden Karau <ho...@pigscanfly.ca>>, Mayank Ahuja <ma...@qubole.com>>, Kalyan Sivakumar <ka...@qubole.com>>, "alfozan@fb.com<ma...@fb.com>" <al...@fb.com>>, Felix Cheung <fe...@uber.com>>, Matt Cheah <mc...@palantir.com>>, "Yifei Huang (PD)" <yi...@palantir.com>>
Subject: Re: Enabling fully disaggregated shuffle on Spark

Felix - please add me to this event.

Ben - should we move this proposal to a doc and open it up for edits/comments.

On Wed, Nov 20, 2019 at 5:37 PM Felix Cheung <fe...@hotmail.com>> wrote:
Great!

Due to number of constraints I won’t be sending link directly here but please r me and I will add you.


________________________________
From: Ben Sidhom <si...@google.com.INVALID>>
Sent: Wednesday, November 20, 2019 9:10:01 AM
To: John Zhuge <jz...@apache.org>>
Cc: bo yang <bo...@gmail.com>>; Amogh Margoor <am...@qubole.com>>; Ryan Blue <rb...@netflix.com>>; Ben Sidhom <si...@google.com.invalid>>; Spark Dev List <de...@spark.apache.org>>; Christopher Crosbie <cr...@google.com>>; Griselda Cuevas <gr...@google.com>>; Holden Karau <ho...@pigscanfly.ca>>; Mayank Ahuja <ma...@qubole.com>>; Kalyan Sivakumar <ka...@qubole.com>>; alfozan@fb.com<ma...@fb.com> <al...@fb.com>>; Felix Cheung <fe...@uber.com>>; Matt Cheah <mc...@palantir.com>>; Yifei Huang (PD) <yi...@palantir.com>>
Subject: Re: Enabling fully disaggregated shuffle on Spark

That sounds great!

On Wed, Nov 20, 2019 at 9:02 AM John Zhuge <jz...@apache.org>> wrote:
That will be great. Please send us the invite.

On Wed, Nov 20, 2019 at 8:56 AM bo yang <bo...@gmail.com>> wrote:
Cool, thanks Ryan, John, Amogh for the reply! Great to see you interested! Felix will have a Spark Scalability & Reliability Sync meeting on Dec 4 1pm PST. We could discuss more details there. Do you want to join?

On Tue, Nov 19, 2019 at 4:23 PM Amogh Margoor <am...@qubole.com>> wrote:
We at Qubole are also looking at disaggregating shuffle on Spark. Would love to collaborate and share learnings.

Regards,
Amogh

On Tue, Nov 19, 2019 at 4:09 PM John Zhuge <jz...@apache.org>> wrote:
Great work, Bo! Would love to hear the details.


On Tue, Nov 19, 2019 at 4:05 PM Ryan Blue <rb...@netflix.com.invalid>> wrote:
I'm interested in remote shuffle services as well. I'd love to hear about what you're using in production!

rb

On Tue, Nov 19, 2019 at 2:43 PM bo yang <bo...@gmail.com>> wrote:
Hi Ben,

Thanks for the writing up! This is Bo from Uber. I am in Felix's team in Seattle, and working on disaggregated shuffle (we called it remote shuffle service, RSS, internally). We have put RSS into production for a while, and learned a lot during the work (tried quite a few techniques to improve the remote shuffle performance). We could share our learning with the community, and also would like to hear feedback/suggestions on how to further improve remote shuffle performance. We could chat more details if you or other people are interested.

Best,
Bo

On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <si...@google.com.invalid>> wrote:

I would like to start a conversation about extending the Spark shuffle manager surface to support fully disaggregated shuffle implementations. This is closely related to the work inSPARK-25299<https://issues.apache.org/jira/browse/SPARK-25299>, which is focused on refactoring the shuffle manager API (and in particular, SortShuffleManager) to use a pluggable storage backend. The motivation for that SPIP is further enabling Spark on Kubernetes.



The motivation for this proposal is enabling full externalized (disaggregated) shuffle service implementations. (Facebook’s Cosco shuffle<https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service> is one example of such a disaggregated shuffle service.) These changes allow the bulk of the shuffle to run in a remote service such that minimal state resides in executors and local disk spill is minimized. The net effect is increased job stability and performance improvements in certain scenarios. These changes should work well with or are complementary to SPARK-25299. Some or all points may be merged into that issue as appropriate.



Below is a description of each component of this proposal. These changes can ideally be introduced incrementally. I would like to gather feedback and gauge interest from others in the community to collaborate on this. There are likely more points that would  be useful to disaggregated shuffle services. We can outline a more concrete plan after gathering enough input. A working session could help us kick off this joint effort; maybe something in the mid-January to mid-February timeframe (depending on interest and availability. I’m happy to host at our Sunnyvale, CA offices.



Proposal
Scheduling and re-executing tasks

Allow coordination between the service and the Spark DAG scheduler as to whether a given block/partition needs to be recomputed when a task fails or when shuffle block data cannot be read. Having such coordination is important, e.g., for suppressing recomputation after aborted executors or for forcing late recomputation if the service internally acts as a cache. One catchall solution is to have the shuffle manager provide an indication of whether shuffle data is external to executors (or nodes). Another option: allow the shuffle manager (likely on the driver) to be queried for the existence of shuffle data for a given executor ID (or perhaps map task, reduce task, etc). Note that this is at the level of data the scheduler is aware of (i.e., map/reduce partitions) rather than block IDs, which are internal details for some shuffle managers.

ShuffleManager API

Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the service knows that data is still active. This is one way to enable time-/job-scoped data because a disaggregated shuffle service cannot rely on robust communication with Spark and in general has a distinct lifecycle from the Spark deployment(s) it talks to. This would likely take the form of a callback on ShuffleManager itself, but there are other approaches.



Add lifecycle hooks to shuffle readers and writers (e.g., to close/recycle connections/streams/file handles as well as provide commit semantics). SPARK-25299 adds commit semantics to the internal data storage layer, but this is applicable to all shuffle managers at a higher level and should apply equally to the ShuffleWriter.



Do not require ShuffleManagers to expose ShuffleBlockResolvers where they are not needed. Ideally, this would be an implementation detail of the shuffle manager itself. If there is substantial overlap between the SortShuffleManager and other implementations, then the storage details can be abstracted at the appropriate level. (SPARK-25299 does not currently change this.)



Do not require MapStatus to include blockmanager IDs where they are not relevant. This is captured byShuffleBlockInfo<https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit#heading=h.imi27prnziyj> including an optional BlockManagerId in SPARK-25299. However, this change should be lifted to the MapStatus level so that it applies to all ShuffleManagers. Alternatively, use a more general data-location abstraction than BlockManagerId. This gives the shuffle manager more flexibility and the scheduler more information with respect to data residence.

Serialization

Allow serializers to be used more flexibly and efficiently. For example, have serializers support writing an arbitrary number of objects into an existing OutputStream or ByteBuffer. This enables objects to be serialized to direct buffers where doing so makes sense. More importantly, it allows arbitrary metadata/framing data to be wrapped around individual objects cheaply. Right now, that’s only possible at the stream level. (There are hacks around this, but this would enable more idiomatic use in efficient shuffle implementations.)



Have serializers indicate whether they are deterministic. This provides much of the value of a shuffle service because it means that reducers do not need to spill to disk when reading/merging/combining inputs--the data can be grouped by the service, even without the service understanding data types or byte representations. Alternative (less preferable since it would break Java serialization, for example): require all serializers to be deterministic.





--

- Ben


--
Ryan Blue
Software Engineer
Netflix


--
John Zhuge


--
John Zhuge


--
-Ben


--
"...:::Aniket:::... Quetzalco@tl"

Re: Enabling fully disaggregated shuffle on Spark

Posted by Saisai Shao <sa...@gmail.com>.
Hi Ben and Felix, I'm also interested in this. Would you please add me to
the invite, thanks a lot.

Best regards,
Saisai

Greg Lee <li...@gmail.com> 于2019年12月2日周一 下午11:34写道:

> Hi Felix & Ben,
>
> This is Li Hao from Baidu, same team with Linhong.
>
> As mentioned in Linhong’s email, independent disaggregated shuffle service
> is also our solution and continuous exploring direction for  improving
> stability of Hadoop MR and Spark in the production environment. We would
> love to hear about this topic from community and share our experience .
>
> Please add me to this event, thanks.
>
> Best Regards
> Li Hao
>
> Liu,Linhong <li...@baidu.com> 于2019年11月29日周五 下午5:09写道:
>
>> Hi Felix & Ben,
>>
>> This is Linhong from Baidu based in Beijing, and we are internally using
>> a disaggregated shuffle service (we call it DCE) as well. We launched this
>> in production 3 years ago for Hadoop shuffle. Last year we migrated spark
>> shuffle to the same DCE shuffle service and stability improved a lot (we
>> can handle more than 100T shuffle now).
>>
>> It would be nice if there is a Spark shuffle API support fully
>> disaggregated shuffle and my team and I are very glad to share our
>> experience and help on this topic.
>>
>> So, if It’s possible, please add me to this event.
>>
>>
>>
>> Thanks,
>>
>> Liu, Linhong
>>
>>
>>
>> *From: *Aniket Mokashi <an...@gmail.com>
>> *Date: *Thursday, November 21, 2019 at 2:12 PM
>> *To: *Felix Cheung <fe...@hotmail.com>
>> *Cc: *Ben Sidhom <si...@google.com.invalid>, John Zhuge <
>> jzhuge@apache.org>, bo yang <bo...@gmail.com>, Amogh Margoor <
>> amoghm@qubole.com>, Ryan Blue <rb...@netflix.com>, Spark Dev List <
>> dev@spark.apache.org>, Christopher Crosbie <cr...@google.com>,
>> Griselda Cuevas <gr...@google.com>, Holden Karau <ho...@pigscanfly.ca>,
>> Mayank Ahuja <ma...@qubole.com>, Kalyan Sivakumar <ka...@qubole.com>,
>> "alfozan@fb.com" <al...@fb.com>, Felix Cheung <fe...@uber.com>, Matt
>> Cheah <mc...@palantir.com>, "Yifei Huang (PD)" <yi...@palantir.com>
>> *Subject: *Re: Enabling fully disaggregated shuffle on Spark
>>
>>
>>
>> Felix - please add me to this event.
>>
>>
>>
>> Ben - should we move this proposal to a doc and open it up for
>> edits/comments.
>>
>>
>>
>> On Wed, Nov 20, 2019 at 5:37 PM Felix Cheung <fe...@hotmail.com>
>> wrote:
>>
>> Great!
>>
>>
>>
>> Due to number of constraints I won’t be sending link directly here but
>> please r me and I will add you.
>>
>>
>>
>>
>> ------------------------------
>>
>> *From:* Ben Sidhom <si...@google.com.INVALID>
>> *Sent:* Wednesday, November 20, 2019 9:10:01 AM
>> *To:* John Zhuge <jz...@apache.org>
>> *Cc:* bo yang <bo...@gmail.com>; Amogh Margoor <am...@qubole.com>;
>> Ryan Blue <rb...@netflix.com>; Ben Sidhom <si...@google.com.invalid>;
>> Spark Dev List <de...@spark.apache.org>; Christopher Crosbie <
>> crosbiec@google.com>; Griselda Cuevas <gr...@google.com>; Holden Karau <
>> holden@pigscanfly.ca>; Mayank Ahuja <ma...@qubole.com>; Kalyan
>> Sivakumar <ka...@qubole.com>; alfozan@fb.com <al...@fb.com>; Felix
>> Cheung <fe...@uber.com>; Matt Cheah <mc...@palantir.com>; Yifei Huang
>> (PD) <yi...@palantir.com>
>> *Subject:* Re: Enabling fully disaggregated shuffle on Spark
>>
>>
>>
>> That sounds great!
>>
>>
>>
>> On Wed, Nov 20, 2019 at 9:02 AM John Zhuge <jz...@apache.org> wrote:
>>
>> That will be great. Please send us the invite.
>>
>>
>>
>> On Wed, Nov 20, 2019 at 8:56 AM bo yang <bo...@gmail.com> wrote:
>>
>> Cool, thanks Ryan, John, Amogh for the reply! Great to see you
>> interested! Felix will have a Spark Scalability & Reliability Sync
>> meeting on Dec 4 1pm PST. We could discuss more details there. Do you want
>> to join?
>>
>>
>>
>> On Tue, Nov 19, 2019 at 4:23 PM Amogh Margoor <am...@qubole.com> wrote:
>>
>> We at Qubole are also looking at disaggregating shuffle on Spark. Would
>> love to collaborate and share learnings.
>>
>>
>>
>> Regards,
>>
>> Amogh
>>
>>
>>
>> On Tue, Nov 19, 2019 at 4:09 PM John Zhuge <jz...@apache.org> wrote:
>>
>> Great work, Bo! Would love to hear the details.
>>
>>
>>
>>
>>
>> On Tue, Nov 19, 2019 at 4:05 PM Ryan Blue <rb...@netflix.com.invalid>
>> wrote:
>>
>> I'm interested in remote shuffle services as well. I'd love to hear about
>> what you're using in production!
>>
>>
>>
>> rb
>>
>>
>>
>> On Tue, Nov 19, 2019 at 2:43 PM bo yang <bo...@gmail.com> wrote:
>>
>> Hi Ben,
>>
>>
>>
>> Thanks for the writing up! This is Bo from Uber. I am in Felix's team in
>> Seattle, and working on disaggregated shuffle (we called it remote shuffle
>> service, RSS, internally). We have put RSS into production for a while, and
>> learned a lot during the work (tried quite a few techniques to improve the
>> remote shuffle performance). We could share our learning with the
>> community, and also would like to hear feedback/suggestions on how to
>> further improve remote shuffle performance. We could chat more details if
>> you or other people are interested.
>>
>>
>>
>> Best,
>>
>> Bo
>>
>>
>>
>> On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <si...@google.com.invalid>
>> wrote:
>>
>> I would like to start a conversation about extending the Spark shuffle
>> manager surface to support fully disaggregated shuffle implementations.
>> This is closely related to the work in SPARK-25299
>> <https://issues.apache.org/jira/browse/SPARK-25299>, which is focused on
>> refactoring the shuffle manager API (and in particular, SortShuffleManager)
>> to use a pluggable storage backend. The motivation for that SPIP is further
>> enabling Spark on Kubernetes.
>>
>>
>>
>> The motivation for this proposal is enabling full externalized
>> (disaggregated) shuffle service implementations. (Facebook’s Cosco
>> shuffle
>> <https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service>
>> is one example of such a disaggregated shuffle service.) These changes
>> allow the bulk of the shuffle to run in a remote service such that minimal
>> state resides in executors and local disk spill is minimized. The net
>> effect is increased job stability and performance improvements in certain
>> scenarios. These changes should work well with or are complementary to
>> SPARK-25299. Some or all points may be merged into that issue as
>> appropriate.
>>
>>
>>
>> Below is a description of each component of this proposal. These changes
>> can ideally be introduced incrementally. I would like to gather feedback
>> and gauge interest from others in the community to collaborate on this.
>> There are likely more points that would  be useful to disaggregated shuffle
>> services. We can outline a more concrete plan after gathering enough input.
>> A working session could help us kick off this joint effort; maybe something
>> in the mid-January to mid-February timeframe (depending on interest and
>> availability. I’m happy to host at our Sunnyvale, CA offices.
>>
>>
>> Proposal Scheduling and re-executing tasks
>>
>> Allow coordination between the service and the Spark DAG scheduler as to
>> whether a given block/partition needs to be recomputed when a task fails or
>> when shuffle block data cannot be read. Having such coordination is
>> important, e.g., for suppressing recomputation after aborted executors or
>> for forcing late recomputation if the service internally acts as a cache.
>> One catchall solution is to have the shuffle manager provide an indication
>> of whether shuffle data is external to executors (or nodes). Another
>> option: allow the shuffle manager (likely on the driver) to be queried for
>> the existence of shuffle data for a given executor ID (or perhaps map task,
>> reduce task, etc). Note that this is at the level of data the scheduler is
>> aware of (i.e., map/reduce partitions) rather than block IDs, which are
>> internal details for some shuffle managers.
>> ShuffleManager API
>>
>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the
>> service knows that data is still active. This is one way to enable
>> time-/job-scoped data because a disaggregated shuffle service cannot rely
>> on robust communication with Spark and in general has a distinct lifecycle
>> from the Spark deployment(s) it talks to. This would likely take the form
>> of a callback on ShuffleManager itself, but there are other approaches.
>>
>>
>>
>> Add lifecycle hooks to shuffle readers and writers (e.g., to
>> close/recycle connections/streams/file handles as well as provide commit
>> semantics). SPARK-25299 adds commit semantics to the internal data storage
>> layer, but this is applicable to all shuffle managers at a higher level and
>> should apply equally to the ShuffleWriter.
>>
>>
>>
>> Do not require ShuffleManagers to expose ShuffleBlockResolvers where they
>> are not needed. Ideally, this would be an implementation detail of the
>> shuffle manager itself. If there is substantial overlap between the
>> SortShuffleManager and other implementations, then the storage details can
>> be abstracted at the appropriate level. (SPARK-25299 does not currently
>> change this.)
>>
>>
>>
>> Do not require MapStatus to include blockmanager IDs where they are not
>> relevant. This is captured by ShuffleBlockInfo
>> <https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit#heading=h.imi27prnziyj>
>> including an *optional* BlockManagerId in SPARK-25299. However, this
>> change should be lifted to the MapStatus level so that it applies to all
>> ShuffleManagers. Alternatively, use a more general data-location
>> abstraction than BlockManagerId. This gives the shuffle manager more
>> flexibility and the scheduler more information with respect to data
>> residence.
>> Serialization
>>
>> Allow serializers to be used more flexibly and efficiently. For example,
>> have serializers support writing an arbitrary number of objects into an
>> existing OutputStream or ByteBuffer. This enables objects to be serialized
>> to direct buffers where doing so makes sense. More importantly, it allows
>> arbitrary metadata/framing data to be wrapped around individual objects
>> cheaply. Right now, that’s only possible at the stream level. (There are
>> hacks around this, but this would enable more idiomatic use in efficient
>> shuffle implementations.)
>>
>>
>>
>> Have serializers indicate whether they are deterministic. This provides
>> much of the value of a shuffle service because it means that reducers do
>> not need to spill to disk when reading/merging/combining inputs--the data
>> can be grouped by the service, even without the service understanding data
>> types or byte representations. Alternative (less preferable since it would
>> break Java serialization, for example): require all serializers to be
>> deterministic.
>>
>>
>>
>>
>>
>> --
>>
>> - Ben
>>
>>
>>
>>
>> --
>>
>> Ryan Blue
>>
>> Software Engineer
>>
>> Netflix
>>
>>
>>
>>
>> --
>>
>> John Zhuge
>>
>>
>>
>>
>> --
>>
>> John Zhuge
>>
>>
>>
>>
>> --
>>
>> -Ben
>>
>>
>>
>>
>> --
>>
>> "...:::Aniket:::... Quetzalco@tl"
>>
>