You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/05/01 01:10:08 UTC

[spark] branch branch-3.0 updated: [SPARK-31549][PYSPARK] Add a develop API invoking collect on Python RDD with user-specified job group

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new a281b9c  [SPARK-31549][PYSPARK] Add a develop API invoking collect on Python RDD with user-specified job group
a281b9c is described below

commit a281b9cc64910f7f708341e8379cd2878461186a
Author: Weichen Xu <we...@databricks.com>
AuthorDate: Fri May 1 10:08:16 2020 +0900

    [SPARK-31549][PYSPARK] Add a develop API invoking collect on Python RDD with user-specified job group
    
    ### What changes were proposed in this pull request?
    I add a new API in pyspark RDD class:
    
    def collectWithJobGroup(self, groupId, description, interruptOnCancel=False)
    
    This API do the same thing with `rdd.collect`, but it can specify the job group when do collect.
    The purpose of adding this API is, if we use:
    
    ```
    sc.setJobGroup("group-id...")
    rdd.collect()
    ```
    The `setJobGroup` API in pyspark won't work correctly. This related to a bug discussed in
    https://issues.apache.org/jira/browse/SPARK-31549
    
    Note:
    
    This PR is a rather temporary workaround for `PYSPARK_PIN_THREAD`, and as a step to migrate to  `PYSPARK_PIN_THREAD` smoothly. It targets Spark 3.0.
    
    - `PYSPARK_PIN_THREAD` is unstable at this moment that affects whole PySpark applications.
    - It is impossible to make it runtime configuration as it has to be set before JVM is launched.
    - There is a thread leak issue between Python and JVM. We should address but it's not a release blocker for Spark 3.0 since the feature is experimental. I plan to handle this after Spark 3.0 due to stability.
    
    Once `PYSPARK_PIN_THREAD` is enabled by default, we should remove this API out ideally. I will target to deprecate this API in Spark 3.1.
    
    ### Why are the changes needed?
    Fix bug.
    
    ### Does this PR introduce any user-facing change?
    A develop API in pyspark: `pyspark.RDD. collectWithJobGroup`
    
    ### How was this patch tested?
    Unit test.
    
    Closes #28395 from WeichenXu123/collect_with_job_group.
    
    Authored-by: Weichen Xu <we...@databricks.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit ee1de66fe4e05754ea3f33b75b83c54772b00112)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../org/apache/spark/api/python/PythonRDD.scala    | 15 ++++++
 python/pyspark/rdd.py                              | 13 +++++
 python/pyspark/tests/test_rdd.py                   | 62 ++++++++++++++++++++++
 3 files changed, 90 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 6dc1721f..a577194 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -169,6 +169,21 @@ private[spark] object PythonRDD extends Logging {
   }
 
   /**
+   * A helper function to collect an RDD as an iterator, then serve it via socket.
+   * This method is similar with `PythonRDD.collectAndServe`, but user can specify job group id,
+   * job description, and interruptOnCancel option.
+   */
+  def collectAndServeWithJobGroup[T](
+      rdd: RDD[T],
+      groupId: String,
+      description: String,
+      interruptOnCancel: Boolean): Array[Any] = {
+    val sc = rdd.sparkContext
+    sc.setJobGroup(groupId, description, interruptOnCancel)
+    serveIterator(rdd.collect().iterator, s"serve RDD ${rdd.id}")
+  }
+
+  /**
    * A helper function to create a local RDD iterator and serve it via socket. Partitions are
    * are collected as separate jobs, by order of index. Partition data is first requested by a
    * non-zero integer to start a collection job. The response is prefaced by an integer with 1
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 52ab86c..b5b72da 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -889,6 +889,19 @@ class RDD(object):
             sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
         return list(_load_from_socket(sock_info, self._jrdd_deserializer))
 
+    def collectWithJobGroup(self, groupId, description, interruptOnCancel=False):
+        """
+        .. note:: Experimental
+
+        When collect rdd, use this method to specify job group.
+
+        .. versionadded:: 3.0.0
+        """
+        with SCCallSiteSync(self.context) as css:
+            sock_info = self.ctx._jvm.PythonRDD.collectAndServeWithJobGroup(
+                self._jrdd.rdd(), groupId, description, interruptOnCancel)
+        return list(_load_from_socket(sock_info, self._jrdd_deserializer))
+
     def reduce(self, f):
         """
         Reduces the elements of this RDD using the specified commutative and
diff --git a/python/pyspark/tests/test_rdd.py b/python/pyspark/tests/test_rdd.py
index 15cc48ae2..6b11d68 100644
--- a/python/pyspark/tests/test_rdd.py
+++ b/python/pyspark/tests/test_rdd.py
@@ -784,6 +784,68 @@ class RDDTests(ReusedPySparkTestCase):
             self.assertEqual(i, next(it))
 
 
+    def test_multiple_group_jobs(self):
+        import threading
+        group_a = "job_ids_to_cancel"
+        group_b = "job_ids_to_run"
+
+        threads = []
+        thread_ids = range(4)
+        thread_ids_to_cancel = [i for i in thread_ids if i % 2 == 0]
+        thread_ids_to_run = [i for i in thread_ids if i % 2 != 0]
+
+        # A list which records whether job is cancelled.
+        # The index of the array is the thread index which job run in.
+        is_job_cancelled = [False for _ in thread_ids]
+
+        def run_job(job_group, index):
+            """
+            Executes a job with the group ``job_group``. Each job waits for 3 seconds
+            and then exits.
+            """
+            try:
+                self.sc.parallelize([15]).map(lambda x: time.sleep(x)) \
+                    .collectWithJobGroup(job_group, "test rdd collect with setting job group")
+                is_job_cancelled[index] = False
+            except Exception:
+                # Assume that exception means job cancellation.
+                is_job_cancelled[index] = True
+
+        # Test if job succeeded when not cancelled.
+        run_job(group_a, 0)
+        self.assertFalse(is_job_cancelled[0])
+
+        # Run jobs
+        for i in thread_ids_to_cancel:
+            t = threading.Thread(target=run_job, args=(group_a, i))
+            t.start()
+            threads.append(t)
+
+        for i in thread_ids_to_run:
+            t = threading.Thread(target=run_job, args=(group_b, i))
+            t.start()
+            threads.append(t)
+
+        # Wait to make sure all jobs are executed.
+        time.sleep(3)
+        # And then, cancel one job group.
+        self.sc.cancelJobGroup(group_a)
+
+        # Wait until all threads launching jobs are finished.
+        for t in threads:
+            t.join()
+
+        for i in thread_ids_to_cancel:
+            self.assertTrue(
+                is_job_cancelled[i],
+                "Thread {i}: Job in group A was not cancelled.".format(i=i))
+
+        for i in thread_ids_to_run:
+            self.assertFalse(
+                is_job_cancelled[i],
+                "Thread {i}: Job in group B did not succeeded.".format(i=i))
+
+
 if __name__ == "__main__":
     import unittest
     from pyspark.tests.test_rdd import *


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org