You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Ashish Rawat <As...@guavus.com> on 2015/08/27 09:42:02 UTC

FW: High Availability of Spark Driver

Hi Patrick,

As discussed in another thread, we are looking for a solution to the problem of lost state on Spark Driver failure. Can you please share Spark's long term strategy for resolving this problem.

<-- Original Mail Content Below -->

We have come across the problem of Spark Applications (on Yarn) requiring a restart in case of Spark Driver (or application master) going down. This is hugely inconvenient for long running applications which are maintaining a big state in memory. The repopulation of state in itself may require a downtime of many minutes, which is not acceptable for most live systems.

As you would have noticed that Yarn community has acknowledged "long running services" as an important class of use cases, and thus identified and removed problems in working with long running services in Yarn.
http://hortonworks.com/blog/support-long-running-services-hadoop-yarn-clusters/

It would be great if Spark, which is the most important processing engine on Yarn, also figures out issues in working with long running Spark applications and publishes recommendations or make framework changes for removing those. The need to keep the application running in case of Driver and Application Master failure, seems to be an important requirement from this perspective. The two most compelling use cases being:

  1.  Huge state of historical data in Spark Streaming, required for stream processing
  2.  Very large cached tables in Spark SQL (very close to our use case where we periodically cache RDDs and query using Spark SQL)

In our analysis, for both of these use cases, a working HA solution can be built by

  1.  Preserving the state of executors (not killing them on driver failures)
  2.  Persisting some meta info required by Spark SQL and Block Manager.
  3.  Restarting and reconnecting the Driver to AM and executors

This would preserve the cached data, enabling the application to come back quickly. This can further be extended for preserving and recovering the Computation State.

I would request you to share your thoughts on this issue and possible future directions.

Regards,
Ashish

Re: High Availability of Spark Driver

Posted by Ashish Rawat <As...@guavus.com>.
If anyone else is also facing similar problems and have figured out a good workaround within the current design, then please share.

Regards,
Ashish

From: Ashish Rawat <as...@guavus.com>>
Date: Thursday, 27 August 2015 1:12 pm
To: "dev@spark.apache.org<ma...@spark.apache.org>" <de...@spark.apache.org>>
Subject: FW: High Availability of Spark Driver

Hi Patrick,

As discussed in another thread, we are looking for a solution to the problem of lost state on Spark Driver failure. Can you please share Spark's long term strategy for resolving this problem.

<-- Original Mail Content Below -->

We have come across the problem of Spark Applications (on Yarn) requiring a restart in case of Spark Driver (or application master) going down. This is hugely inconvenient for long running applications which are maintaining a big state in memory. The repopulation of state in itself may require a downtime of many minutes, which is not acceptable for most live systems.

As you would have noticed that Yarn community has acknowledged "long running services" as an important class of use cases, and thus identified and removed problems in working with long running services in Yarn.
http://hortonworks.com/blog/support-long-running-services-hadoop-yarn-clusters/

It would be great if Spark, which is the most important processing engine on Yarn, also figures out issues in working with long running Spark applications and publishes recommendations or make framework changes for removing those. The need to keep the application running in case of Driver and Application Master failure, seems to be an important requirement from this perspective. The two most compelling use cases being:

  1.  Huge state of historical data in Spark Streaming, required for stream processing
  2.  Very large cached tables in Spark SQL (very close to our use case where we periodically cache RDDs and query using Spark SQL)

In our analysis, for both of these use cases, a working HA solution can be built by

  1.  Preserving the state of executors (not killing them on driver failures)
  2.  Persisting some meta info required by Spark SQL and Block Manager.
  3.  Restarting and reconnecting the Driver to AM and executors

This would preserve the cached data, enabling the application to come back quickly. This can further be extended for preserving and recovering the Computation State.

I would request you to share your thoughts on this issue and possible future directions.

Regards,
Ashish

Re: High Availability of Spark Driver

Posted by Chester Chen <ch...@alpinenow.com>.
Ashish and Steve
     I am also working on the long running Yarn Spark Job. Just start to
focus on failure recovery. This thread of discussion is really helpful.

Chester

On Fri, Aug 28, 2015 at 12:53 AM, Ashish Rawat <As...@guavus.com>
wrote:

> Thanks Steve. I had not spent many brain cycles on analysing the Yarn
> pieces, your insights would be extremely useful.
>
> I was also considering Zookeeper and Yarn registry for persisting state
> and sharing information. But for a basic POC, I used the file system and
> was able to
>
>    1. Preserve Executors.
>    2. Reconnect Executors back to Driver by storing the Executor
>    endpoints info into a local file system. When driver restarts, use this
>    info to send update driver message to executor endpoints. Executors can
>    then update all of their Akka endpoints and reconnect.
>    3. Reregister Block Manager and report back blocks. This utilises most
>    of Spark’s existing code, I only had to update the BlockManagerMaster
>    endpoint in executors.
>
> Surprisingly, Spark components took the restart in a much better way than
> I had anticipated and were easy to accept new work :-)
>
> I am still figuring out other complexities around preserving RDD lineage
> and computation. From my initial analysis, preserving the whole computation
> might be complex and may not be required. Perhaps, the lineage of only the
> cached RDDs can be preserved to recover any lost blocks.
>
> I am definitely not underestimating the effort, both within Spark and
> around interfacing with Yarn, but just trying to emphasise that a single
> node leading to full application restart, does not seem right for a long
> running service. Thoughts?
>
> Regards,
> Ashish
>
> From: Steve Loughran <st...@hortonworks.com>
> Date: Thursday, 27 August 2015 4:19 pm
> To: Ashish Rawat <as...@guavus.com>
> Cc: "dev@spark.apache.org" <de...@spark.apache.org>
> Subject: Re: High Availability of Spark Driver
>
>
> On 27 Aug 2015, at 08:42, Ashish Rawat <As...@guavus.com> wrote:
>
> Hi Patrick,
>
> As discussed in another thread, we are looking for a solution to the
> problem of lost state on Spark Driver failure. Can you please share Spark’s
> long term strategy for resolving this problem.
>
> <-- Original Mail Content Below -->
>
> We have come across the problem of Spark Applications (on Yarn) requiring
> a restart in case of Spark Driver (or application master) going down. This
> is hugely inconvenient for long running applications which are maintaining
> a big state in memory. The repopulation of state in itself may require a
> downtime of many minutes, which is not acceptable for most live systems.
>
> As you would have noticed that Yarn community has acknowledged "long
> running services" as an important class of use cases, and thus identified
> and removed problems in working with long running services in Yarn.
>
> http://hortonworks.com/blog/support-long-running-services-hadoop-yarn-clusters/
>
>
> Yeah, I spent a lot of time on that, or at least using the features, in
> other work under YARN-896, summarised in
> http://www.slideshare.net/steve_l/yarn-services
>
> It would be great if Spark, which is the most important processing engine
> on Yarn,
>
>
> I'f you look at the CPU-hours going in to the big hadoop clusters, it's
> actually MR work and things behind Hive. but: these apps don't attempt HA
>
> Why not? It requires whatever maintains the overall app status (spark: the
> driver) to persist that state in a way where it can be rebuilt. A restarted
> AM with the "retain containers" feature turned on gets nothing back from
> YARN except the list of previous allocated containers, and is left to sort
> itself out.
>
> also figures out issues in working with long running Spark applications
> and publishes recommendations or make framework changes for removing those.
> The need to keep the application running in case of Driver and Application
> Master failure, seems to be an important requirement from this perspective.
> The two most compelling use cases being:
>
>    1. Huge state of historical data in *Spark Streaming*, required for
>    stream processing
>    2. Very large cached tables in *Spark SQL* (very close to our use case
>    where we periodically cache RDDs and query using Spark SQL)
>
>
>
> Generally spark streaming is viewed as the big need here, but yes,
> long-lived cached data matters.
>
> Bear in mind that before Spark 1.5, you can't run any spark YARN app for
> longer than the expiry time of your delegation tokens, so in a secure
> cluster you have a limit of a couple of days anyway. Unless your cluster is
> particularly unreliable, AM failures are usually pretty unlikely in such a
> short timespan. Container failure is more likely as 1) you have more of
> them and 2) if you have pre-emption turned on in the scheduler or are
> pushing the work out to a label containing spot VMs, the will fail.
>
> In our analysis, for both of these use cases, a working HA solution can be
> built by
>
>    1. Preserving the state of executors (not killing them on driver
>    failures)
>
> This is a critical one
>
>
>    1. Persisting some meta info required by Spark SQL and Block Manager.
>
>
> again, needs a failure tolerant storage mechanism. HDFS and ZK can work
> together here, but your code needs to handle all the corner cases of
> inconsistency, including the "AM failure partway through state update"
> scenario.
>
> Sometimes you even need to reach for the mathematics, with TLA+ being the
> language of choice. Start with the ZK proof paper to see if you can get a
> vague idea about what it's up to -as that gives hints about how its
> behaviour may not be what you expect.
>
>
>    1. Restarting and reconnecting the Driver to AM and executors
>
>
> I don't know how Akka can recover from this. Existing long-lived YARN
> services use the Hadoop 2.6+ YARN registry, which was done with this
> purpose in mind. Example, slider: when the containers lose contact with the
> AM, they pol the registry to await a new AM entry.
>
> This would preserve the cached data, enabling the application to come back
> quickly. This can further be extended for preserving and recovering the
> Computation State.
>
>
> There's also
>
>    1. Credential recovery. Restarted AMs get an updated HDFS delegation
>    token by way of YARN, but nothing else.
>    2. Container/AM failure tracking to identify failing clusters. YARN
>    uses a weighted moving average to decide when an AM is unreliable; on
>    long-lived services the service itself should reach the same decisions
>    about containers and nodes.
>    3. Testing. You need to be confident that things are resilient to
>    failure and network partitions. Don't underestimate the effort here -Jepsen
>    shows what is needed ( https://aphyr.com/ ). Saying "Zookeeper handles
>    it all" doesn't magically fix things.
>
>
> I an HA runtime is ultimately a great thing to have —but don't
> underestimate the effort.
>
>
>
>
> I would request you to share your thoughts on this issue and possible
> future directions.
>
> Regards,
> Ashish
>
>
>

Re: High Availability of Spark Driver

Posted by Ashish Rawat <As...@guavus.com>.
Thanks Steve. I had not spent many brain cycles on analysing the Yarn pieces, your insights would be extremely useful.

I was also considering Zookeeper and Yarn registry for persisting state and sharing information. But for a basic POC, I used the file system and was able to

  1.  Preserve Executors.
  2.  Reconnect Executors back to Driver by storing the Executor endpoints info into a local file system. When driver restarts, use this info to send update driver message to executor endpoints. Executors can then update all of their Akka endpoints and reconnect.
  3.  Reregister Block Manager and report back blocks. This utilises most of Spark’s existing code, I only had to update the BlockManagerMaster endpoint in executors.

Surprisingly, Spark components took the restart in a much better way than I had anticipated and were easy to accept new work :-)

I am still figuring out other complexities around preserving RDD lineage and computation. From my initial analysis, preserving the whole computation might be complex and may not be required. Perhaps, the lineage of only the cached RDDs can be preserved to recover any lost blocks.

I am definitely not underestimating the effort, both within Spark and around interfacing with Yarn, but just trying to emphasise that a single node leading to full application restart, does not seem right for a long running service. Thoughts?

Regards,
Ashish

From: Steve Loughran <st...@hortonworks.com>>
Date: Thursday, 27 August 2015 4:19 pm
To: Ashish Rawat <as...@guavus.com>>
Cc: "dev@spark.apache.org<ma...@spark.apache.org>" <de...@spark.apache.org>>
Subject: Re: High Availability of Spark Driver


On 27 Aug 2015, at 08:42, Ashish Rawat <As...@guavus.com>> wrote:

Hi Patrick,

As discussed in another thread, we are looking for a solution to the problem of lost state on Spark Driver failure. Can you please share Spark’s long term strategy for resolving this problem.

<-- Original Mail Content Below -->

We have come across the problem of Spark Applications (on Yarn) requiring a restart in case of Spark Driver (or application master) going down. This is hugely inconvenient for long running applications which are maintaining a big state in memory. The repopulation of state in itself may require a downtime of many minutes, which is not acceptable for most live systems.

As you would have noticed that Yarn community has acknowledged "long running services" as an important class of use cases, and thus identified and removed problems in working with long running services in Yarn.
http://hortonworks.com/blog/support-long-running-services-hadoop-yarn-clusters/


Yeah, I spent a lot of time on that, or at least using the features, in other work under YARN-896, summarised in http://www.slideshare.net/steve_l/yarn-services

It would be great if Spark, which is the most important processing engine on Yarn,

I'f you look at the CPU-hours going in to the big hadoop clusters, it's actually MR work and things behind Hive. but: these apps don't attempt HA

Why not? It requires whatever maintains the overall app status (spark: the driver) to persist that state in a way where it can be rebuilt. A restarted AM with the "retain containers" feature turned on gets nothing back from YARN except the list of previous allocated containers, and is left to sort itself out.

also figures out issues in working with long running Spark applications and publishes recommendations or make framework changes for removing those. The need to keep the application running in case of Driver and Application Master failure, seems to be an important requirement from this perspective. The two most compelling use cases being:

  1.  Huge state of historical data in Spark Streaming, required for stream processing
  2.  Very large cached tables in Spark SQL (very close to our use case where we periodically cache RDDs and query using Spark SQL)


Generally spark streaming is viewed as the big need here, but yes, long-lived cached data matters.

Bear in mind that before Spark 1.5, you can't run any spark YARN app for longer than the expiry time of your delegation tokens, so in a secure cluster you have a limit of a couple of days anyway. Unless your cluster is particularly unreliable, AM failures are usually pretty unlikely in such a short timespan. Container failure is more likely as 1) you have more of them and 2) if you have pre-emption turned on in the scheduler or are pushing the work out to a label containing spot VMs, the will fail.

In our analysis, for both of these use cases, a working HA solution can be built by

  1.  Preserving the state of executors (not killing them on driver failures)

This is a critical one


  1.  Persisting some meta info required by Spark SQL and Block Manager.

again, needs a failure tolerant storage mechanism. HDFS and ZK can work together here, but your code needs to handle all the corner cases of inconsistency, including the "AM failure partway through state update" scenario.

Sometimes you even need to reach for the mathematics, with TLA+ being the language of choice. Start with the ZK proof paper to see if you can get a vague idea about what it's up to -as that gives hints about how its behaviour may not be what you expect.

  1.  Restarting and reconnecting the Driver to AM and executors

I don't know how Akka can recover from this. Existing long-lived YARN services use the Hadoop 2.6+ YARN registry, which was done with this purpose in mind. Example, slider: when the containers lose contact with the AM, they pol the registry to await a new AM entry.

This would preserve the cached data, enabling the application to come back quickly. This can further be extended for preserving and recovering the Computation State.


There's also

  1.  Credential recovery. Restarted AMs get an updated HDFS delegation token by way of YARN, but nothing else.
  2.  Container/AM failure tracking to identify failing clusters. YARN uses a weighted moving average to decide when an AM is unreliable; on long-lived services the service itself should reach the same decisions about containers and nodes.
  3.  Testing. You need to be confident that things are resilient to failure and network partitions. Don't underestimate the effort here -Jepsen shows what is needed ( https://aphyr.com/ ). Saying "Zookeeper handles it all" doesn't magically fix things.

I an HA runtime is ultimately a great thing to have —but don't underestimate the effort.




I would request you to share your thoughts on this issue and possible future directions.

Regards,
Ashish


Re: High Availability of Spark Driver

Posted by Steve Loughran <st...@hortonworks.com>.
On 27 Aug 2015, at 08:42, Ashish Rawat <As...@guavus.com>> wrote:

Hi Patrick,

As discussed in another thread, we are looking for a solution to the problem of lost state on Spark Driver failure. Can you please share Spark’s long term strategy for resolving this problem.

<-- Original Mail Content Below -->

We have come across the problem of Spark Applications (on Yarn) requiring a restart in case of Spark Driver (or application master) going down. This is hugely inconvenient for long running applications which are maintaining a big state in memory. The repopulation of state in itself may require a downtime of many minutes, which is not acceptable for most live systems.

As you would have noticed that Yarn community has acknowledged "long running services" as an important class of use cases, and thus identified and removed problems in working with long running services in Yarn.
http://hortonworks.com/blog/support-long-running-services-hadoop-yarn-clusters/


Yeah, I spent a lot of time on that, or at least using the features, in other work under YARN-896, summarised in http://www.slideshare.net/steve_l/yarn-services

It would be great if Spark, which is the most important processing engine on Yarn,

I'f you look at the CPU-hours going in to the big hadoop clusters, it's actually MR work and things behind Hive. but: these apps don't attempt HA

Why not? It requires whatever maintains the overall app status (spark: the driver) to persist that state in a way where it can be rebuilt. A restarted AM with the "retain containers" feature turned on gets nothing back from YARN except the list of previous allocated containers, and is left to sort itself out.

also figures out issues in working with long running Spark applications and publishes recommendations or make framework changes for removing those. The need to keep the application running in case of Driver and Application Master failure, seems to be an important requirement from this perspective. The two most compelling use cases being:

  1.  Huge state of historical data in Spark Streaming, required for stream processing
  2.  Very large cached tables in Spark SQL (very close to our use case where we periodically cache RDDs and query using Spark SQL)


Generally spark streaming is viewed as the big need here, but yes, long-lived cached data matters.

Bear in mind that before Spark 1.5, you can't run any spark YARN app for longer than the expiry time of your delegation tokens, so in a secure cluster you have a limit of a couple of days anyway. Unless your cluster is particularly unreliable, AM failures are usually pretty unlikely in such a short timespan. Container failure is more likely as 1) you have more of them and 2) if you have pre-emption turned on in the scheduler or are pushing the work out to a label containing spot VMs, the will fail.

In our analysis, for both of these use cases, a working HA solution can be built by

  1.  Preserving the state of executors (not killing them on driver failures)

This is a critical one


  1.  Persisting some meta info required by Spark SQL and Block Manager.

again, needs a failure tolerant storage mechanism. HDFS and ZK can work together here, but your code needs to handle all the corner cases of inconsistency, including the "AM failure partway through state update" scenario.

Sometimes you even need to reach for the mathematics, with TLA+ being the language of choice. Start with the ZK proof paper to see if you can get a vague idea about what it's up to -as that gives hints about how its behaviour may not be what you expect.

  1.  Restarting and reconnecting the Driver to AM and executors

I don't know how Akka can recover from this. Existing long-lived YARN services use the Hadoop 2.6+ YARN registry, which was done with this purpose in mind. Example, slider: when the containers lose contact with the AM, they pol the registry to await a new AM entry.

This would preserve the cached data, enabling the application to come back quickly. This can further be extended for preserving and recovering the Computation State.


There's also

  1.  Credential recovery. Restarted AMs get an updated HDFS delegation token by way of YARN, but nothing else.
  2.  Container/AM failure tracking to identify failing clusters. YARN uses a weighted moving average to decide when an AM is unreliable; on long-lived services the service itself should reach the same decisions about containers and nodes.
  3.  Testing. You need to be confident that things are resilient to failure and network partitions. Don't underestimate the effort here -Jepsen shows what is needed ( https://aphyr.com/ ). Saying "Zookeeper handles it all" doesn't magically fix things.

I an HA runtime is ultimately a great thing to have —but don't underestimate the effort.




I would request you to share your thoughts on this issue and possible future directions.

Regards,
Ashish