You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/06/27 04:40:57 UTC

[spark] branch master updated: [SPARK-44202][CORE] Add JobTag APIs to JavaSparkContext

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 201d2b1e127 [SPARK-44202][CORE] Add JobTag APIs to JavaSparkContext
201d2b1e127 is described below

commit 201d2b1e1278288f4c0b746279fe1c3146422492
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Mon Jun 26 21:40:46 2023 -0700

    [SPARK-44202][CORE] Add JobTag APIs to JavaSparkContext
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to add:
    
    - `SparkContext.setInterruptOnCancel(interruptOnCancel: Boolean): Unit`
    - `SparkContext.addJobTag(tag: String): Unit`
    - `SparkContext.removeJobTag(tag: String): Unit`
    - `SparkContext.getJobTags(): Set[String]`
    - `SparkContext.clearJobTags(): Unit`
    - `SparkContext.cancelJobsWithTag(tag: String): Unit`
    
    into Java API.
    
    ### Why are the changes needed?
    
    For Java users. In addition, these will be used in Python implementation in SPARK-44194
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, it adds new API in Java.
    
    ### How was this patch tested?
    
    It's an alias so no tests added. These will be at least called once via PySpark when I work on SPARK-44194 so I don't worry about it too much.
    
    Closes #41753 from HyukjinKwon/SPARK-44202.
    
    Authored-by: Hyukjin Kwon <gu...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../main/scala/org/apache/spark/SparkContext.scala | 20 +++++++-
 .../apache/spark/api/java/JavaSparkContext.scala   | 55 ++++++++++++++++++++++
 2 files changed, 73 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index fe3fe1be429..24d788ff5bc 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -844,6 +844,8 @@ class SparkContext(config: SparkConf) extends Logging {
    * being called on the job's executor threads. This is useful to help ensure that the tasks
    * are actually stopped in a timely manner, but is off by default due to HDFS-1208, where HDFS
    * may respond to Thread.interrupt() by marking nodes as dead.
+   *
+   * @since 3.5.0
    */
   def setInterruptOnCancel(interruptOnCancel: Boolean): Unit = {
     setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, interruptOnCancel.toString)
@@ -853,6 +855,8 @@ class SparkContext(config: SparkConf) extends Logging {
    * Add a tag to be assigned to all the jobs started by this thread.
    *
    * @param tag The tag to be added. Cannot contain ',' (comma) character.
+   *
+   * @since 3.5.0
    */
   def addJobTag(tag: String): Unit = {
     SparkContext.throwIfInvalidTag(tag)
@@ -866,6 +870,8 @@ class SparkContext(config: SparkConf) extends Logging {
    * Noop if such a tag was not added earlier.
    *
    * @param tag The tag to be removed. Cannot contain ',' (comma) character.
+   *
+   * @since 3.5.0
    */
   def removeJobTag(tag: String): Unit = {
     SparkContext.throwIfInvalidTag(tag)
@@ -874,14 +880,22 @@ class SparkContext(config: SparkConf) extends Logging {
     setLocalProperty(SparkContext.SPARK_JOB_TAGS, newTags)
   }
 
-  /** Get the tags that are currently set to be assigned to all the jobs started by this thread. */
+  /**
+   * Get the tags that are currently set to be assigned to all the jobs started by this thread.
+   *
+   * @since 3.5.0
+   */
   def getJobTags(): Set[String] = {
     Option(getLocalProperty(SparkContext.SPARK_JOB_TAGS))
       .map(_.split(SparkContext.SPARK_JOB_TAGS_SEP).toSet)
       .getOrElse(Set())
   }
 
-  /** Clear the current thread's job tags. */
+  /**
+   * Clear the current thread's job tags.
+   *
+   * @since 3.5.0
+   */
   def clearJobTags(): Unit = {
     setLocalProperty(SparkContext.SPARK_JOB_TAGS, null)
   }
@@ -2532,6 +2546,8 @@ class SparkContext(config: SparkConf) extends Logging {
    * Cancel active jobs that have the specified tag. See `org.apache.spark.SparkContext.addJobTag`.
    *
    * @param tag The tag to be added. Cannot contain ',' (comma) character.
+   *
+   * @since 3.5.0
    */
   def cancelJobsWithTag(tag: String): Unit = {
     SparkContext.throwIfInvalidTag(tag)
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 608158caacd..31703cdad5b 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -712,12 +712,67 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable {
   /** Clear the current thread's job group ID and its description. */
   def clearJobGroup(): Unit = sc.clearJobGroup()
 
+  /**
+   * Set the behavior of job cancellation from jobs started in this thread.
+   *
+   * @param interruptOnCancel If true, then job cancellation will result in `Thread.interrupt()`
+   * being called on the job's executor threads. This is useful to help ensure that the tasks
+   * are actually stopped in a timely manner, but is off by default due to HDFS-1208, where HDFS
+   * may respond to Thread.interrupt() by marking nodes as dead.
+   *
+   * @since 3.5.0
+   */
+  def setInterruptOnCancel(interruptOnCancel: Boolean): Unit =
+    sc.setInterruptOnCancel(interruptOnCancel)
+
+  /**
+   * Add a tag to be assigned to all the jobs started by this thread.
+   *
+   * @param tag The tag to be added. Cannot contain ',' (comma) character.
+   *
+   * @since 3.5.0
+   */
+  def addJobTag(tag: String): Unit = sc.addJobTag(tag)
+
+  /**
+   * Remove a tag previously added to be assigned to all the jobs started by this thread.
+   * Noop if such a tag was not added earlier.
+   *
+   * @param tag The tag to be removed. Cannot contain ',' (comma) character.
+   *
+   * @since 3.5.0
+   */
+  def removeJobTag(tag: String): Unit = sc.removeJobTag(tag)
+
+  /**
+   * Get the tags that are currently set to be assigned to all the jobs started by this thread.
+   *
+   * @since 3.5.0
+   */
+  def getJobTags(): util.Set[String] = sc.getJobTags.asJava
+
+  /**
+   * Clear the current thread's job tags.
+   *
+   * @since 3.5.0
+   */
+  def clearJobTags(): Unit = sc.clearJobTags()
+
   /**
    * Cancel active jobs for the specified group. See
    * `org.apache.spark.api.java.JavaSparkContext.setJobGroup` for more information.
    */
   def cancelJobGroup(groupId: String): Unit = sc.cancelJobGroup(groupId)
 
+  /**
+   * Cancel active jobs that have the specified tag. See `org.apache.spark.SparkContext.addJobTag`.
+   *
+   * @param tag The tag to be added. Cannot contain ',' (comma) character.
+   *
+   * @since 3.5.0
+   */
+  def cancelJobsWithTag(tag: String): Unit = sc.cancelJobsWithTag(tag)
+
   /** Cancel all jobs that have been scheduled or are running. */
   def cancelAllJobs(): Unit = sc.cancelAllJobs()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org