You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by JoshRosen <gi...@git.apache.org> on 2014/09/21 21:28:22 UTC

[GitHub] spark pull request: [SPARK-3626] [WIP] Replace AsyncRDDActions wit...

GitHub user JoshRosen opened a pull request:

    https://github.com/apache/spark/pull/2482

    [SPARK-3626] [WIP] Replace AsyncRDDActions with a more general runAsync() mechanism

    ### Background
    
    The `AsyncRDDActions` methods were introduced in e33b1839e27249e232a2126cec67a38109e03243, the first pull request to add support for job cancelation.  A follow pull request, 599dcb0ddf740e028cc8faac163303be8f9400a6, added cancelation on a per-job-group basis.  Quoting from that PR:
    
    > This PR adds a simple API to group together a set of jobs belonging to a thread and threads spawned from it. It also allows the cancellation of all jobs in this group.
    > 
    >     An example:
    > 
    >         sc.setJobDescription("this_is_the_group_id", "some job description")
    >         sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()
    > 
    >     In a separate thread:
    > 
    >         sc.cancelJobGroup("this_is_the_group_id")
    
    In its current form, `AsyncRDDActions` seems to serve no purpose other than to enable job cancelation.  `AsyncRDDActions` is marked as `@Experimental`, so users may be reluctant to depend on it.  If we add new actions, then we also have to add asynchronous versions of those actions, creating a maintenance burden.
    
    ### Proposal
    
    I propose that we remove `AsyncRDDActions` and use job groups as the only user-facing API for job cancelation.  To make job groups more convenient for users, this pull request adds a `runAsync` method to SparkContext that makes it easy to run an asynchronous computation in a particular Spark job group.  For example:
    
    ```scala
    // Instead of countAsync(), we call the regular actions from a runAsync block:
    val futureCount: RunAsyncResult[Long] = sc.runAsync {
       sc.parallelize(...).map(...).count()
    }
    
    // This returns a Future:
    futureCount.onSuccess(c => println(s"Got count $c!"))
    
    // The future also supports cancellation
    futureCount.cancel()
    
    // This also works with blocks that call multiple actions (e.g. an iterative ML algorithm)
    val futureResult = sc.runAsync {
       val rdd = sc.parallelize(...)
       val count = rdd.count()
       val first = rdd.first()
       first  // this is the result of the block
    } 
    ```
    
    I refactored `JobCancellationSuite` to use this new API instead of AsyncRDDActions; see that file for more examples.
    
    ### TODOS / tasks to finish before merging:
    
    This is marked as [WIP] since there are still a number of tasks that need to be finished before this is merge-worthy:
    
    - [ ] Add a Java interface for this; it can probably be an extension of `Runnable`.
    - [ ] Extend `RunAsyncResult` to expose the job ids of jobs launched from its computation (see [SPARK-3446](https://issues.apache.org/jira/browse/SPARK-3446) / #2337).
    - [ ] The thread-safety concerns here are somewhat complicated; add more comments.
    - [ ] Explain the caveats with job-group-based cancellation:
        - Having multiple threads submitting jobs in the same job group may lead to confusing behavior.
        - Recursive runAsync() calls may have undefined / confusing behavior.
    - [ ] Look through AsyncRDDActionsSuite to see whether there are any tests that need to be re-added for these APIs.
    - [ ] Add example uses in the Spark examples subproject, since it may not be obvious how to use this.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/JoshRosen/spark remove-asyncrddactions

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2482.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2482
    
----
commit c715511f6bd7ac9ee6b30b4c3f238e59738c98ad
Author: Josh Rosen <jo...@apache.org>
Date:   2014-09-21T01:56:17Z

    WIP towards replacing AsyncRDDActions with a more general mechanism.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3626] [WIP] Replace AsyncRDDActions wit...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2482#issuecomment-56309826
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20632/consoleFull) for   PR 2482 at commit [`c715511`](https://github.com/apache/spark/commit/c715511f6bd7ac9ee6b30b4c3f238e59738c98ad).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3626] [WIP] Replace AsyncRDDActions wit...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/2482#issuecomment-56310596
  
    @rxin Do you have any examples of why a user might prefer the old model, besides backwards-compatibility?  I'd like to understand if the old model (in its current form) provides any features that this proposal is missing (so that I can add them).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3626] [WIP] Replace AsyncRDDActions wit...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/2482#issuecomment-56315058
  
    I've taken another pass at this.  This time, I kept AsyncRDDActions but re-implemented it using `runAsync`, but I'm actually on the fence about that change.  The one difference here is that the asynchronous jobs will now be submitted with anonymous job groups rather than as part of the calling thread's job group.  This change might be observable by a user who writes a job that fires off multiple asynchronous actions from a single driver control thread, then attempts to cancel that thread's job group.  Because job groups don't have any hierarchy / nesting, this would break the cancellation of those jobs.
    
    I'm beginning to get the sense that we might not have much room to change anything about the implementation of AsyncRDDActions, so maybe we should just let them be.
    
    @rxin Based on our discussion, I added a check in DAGScheduler to reject jobs submitted by cancelled threads.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3626] [WIP] Replace AsyncRDDActions wit...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2482#issuecomment-56309919
  
    I don't think we can just wipe the old one out. At the very least, we need to "deprecate" it. Even that is debatable because some applications might prefer this async model.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3626] [WIP] Replace AsyncRDDActions wit...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2482#issuecomment-56312285
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20633/consoleFull) for   PR 2482 at commit [`3171939`](https://github.com/apache/spark/commit/317193922391c191b3d84913d9d5ffe0bf5d3ad1).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3626] [WIP] Replace AsyncRDDActions wit...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2482#issuecomment-56310100
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20633/consoleFull) for   PR 2482 at commit [`3171939`](https://github.com/apache/spark/commit/317193922391c191b3d84913d9d5ffe0bf5d3ad1).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3626] [WIP] Replace AsyncRDDActions wit...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2482#discussion_r17827908
  
    --- Diff: core/src/main/scala/org/apache/spark/RunAsyncResult.scala ---
    @@ -0,0 +1,93 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark
    +
    +import scala.concurrent._
    +import scala.concurrent.duration.Duration
    +import scala.util.Try
    +
    +
    +/**
    + * This is an extension of the Scala Future interface to support cancellation.
    + */
    +class RunAsyncResult[T](jobGroupId: String,
    +                        jobGroupDescription: String,
    +                        sc: SparkContext,
    +                        func: => T) extends Future[T] {
    +
    +  // Pointer to the thread that is executing the action; it is set when the action is run.
    +  @volatile private var thread: Thread = _
    +
    +  // A promise used to signal the future.
    +  private val p = promise[T]()
    +
    +  /**
    +   * Cancel this Future and any Spark jobs launched from it.  The cancellation of Spark jobs is
    +   * performed asynchronously.
    +   */
    +  def cancel(): Unit = this.synchronized {
    +    if (thread != null) {
    +      thread.interrupt()
    --- End diff --
    
    note that this is not a reliable (actually very unreliable) way of cancelling a thread.
    
    this only stops the thread if it is waiting on io or sleeping. if the user thread is actually executing stuff (or busy looping), this doesn't do anything.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3626] [WIP] Replace AsyncRDDActions wit...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on the pull request:

    https://github.com/apache/spark/pull/2482#issuecomment-56310259
  
    Yes, I know that they are now Experimental, but they weren't always so, since we didn't have the Experimental designation/policy when AsyncRDDActions was introduced.  And even though we can remove the old code without violating policy, we should recognize that any user code that is using the old code is likely to require non-trivial changes to confidently move to the new style code.  Deprecation makes sense even if it's not strictly required.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3626] [WIP] Replace AsyncRDDActions wit...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2482#issuecomment-56314921
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20635/consoleFull) for   PR 2482 at commit [`4882082`](https://github.com/apache/spark/commit/48820826b985b5ff131c383fd2e286254256e0b7).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3626] [WIP] Replace AsyncRDDActions wit...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on the pull request:

    https://github.com/apache/spark/pull/2482#issuecomment-56310103
  
    +1 @rxin 
    
    Just scanned through the code quickly, and I didn't immediately see anything that would preclude retaining and deprecating the old code while introducing the new (which looks pretty good!)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3626] [WIP] Replace AsyncRDDActions wit...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen closed the pull request at:

    https://github.com/apache/spark/pull/2482


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3626] [WIP] Replace AsyncRDDActions wit...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2482#issuecomment-56316673
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20635/consoleFull) for   PR 2482 at commit [`4882082`](https://github.com/apache/spark/commit/48820826b985b5ff131c383fd2e286254256e0b7).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3626] [WIP] Replace AsyncRDDActions wit...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/2482#issuecomment-56310133
  
    Fair enough, although the `AsyncRDDActions` class was marked as `@Experimental` and the documentation for that annotation explicitly warns that experimental APIs might change or be removed even in minor releases:
    
    > ```scala
    > /**
    >  * An experimental user-facing API.
    >  *
    >  * Experimental API's might change or be removed in minor versions of Spark, or be adopted as
    >  * first-class Spark API's.
    >  *
    > ```
    
    On the other hand, the individual methods weren't marked as `@Experimental` and we _did_ provide an implicit conversion, so it's possible that users might have started relying on these APIs without realizing that they were experimental.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3626] [WIP] Replace AsyncRDDActions wit...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2482#issuecomment-56309790
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20632/consoleFull) for   PR 2482 at commit [`c715511`](https://github.com/apache/spark/commit/c715511f6bd7ac9ee6b30b4c3f238e59738c98ad).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3626] [WIP] Replace AsyncRDDActions wit...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2482#discussion_r17827924
  
    --- Diff: core/src/main/scala/org/apache/spark/RunAsyncResult.scala ---
    @@ -0,0 +1,93 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark
    +
    +import scala.concurrent._
    +import scala.concurrent.duration.Duration
    +import scala.util.Try
    +
    +
    +/**
    + * This is an extension of the Scala Future interface to support cancellation.
    + */
    +class RunAsyncResult[T](jobGroupId: String,
    +                        jobGroupDescription: String,
    +                        sc: SparkContext,
    +                        func: => T) extends Future[T] {
    +
    +  // Pointer to the thread that is executing the action; it is set when the action is run.
    +  @volatile private var thread: Thread = _
    +
    +  // A promise used to signal the future.
    +  private val p = promise[T]()
    +
    +  /**
    +   * Cancel this Future and any Spark jobs launched from it.  The cancellation of Spark jobs is
    +   * performed asynchronously.
    +   */
    +  def cancel(): Unit = this.synchronized {
    +    if (thread != null) {
    +      thread.interrupt()
    +      thread.join()
    +      thread = null
    +    }
    +    sc.cancelJobGroup(jobGroupId)
    --- End diff --
    
    this only works for jobs that have been submitted in the past, not jobs that will be submitted in the future?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3626] [WIP] Replace AsyncRDDActions wit...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/2482#issuecomment-57572018
  
    I'm going to close this for now.  My approach has some confusing semantics and may be more general than what most users need.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org