You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/06/27 04:40:57 UTC
[spark] branch master updated: [SPARK-44202][CORE] Add JobTag APIs to JavaSparkContext
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 201d2b1e127 [SPARK-44202][CORE] Add JobTag APIs to JavaSparkContext
201d2b1e127 is described below
commit 201d2b1e1278288f4c0b746279fe1c3146422492
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Mon Jun 26 21:40:46 2023 -0700
[SPARK-44202][CORE] Add JobTag APIs to JavaSparkContext
### What changes were proposed in this pull request?
This PR proposes to add:
- `SparkContext.setInterruptOnCancel(interruptOnCancel: Boolean): Unit`
- `SparkContext.addJobTag(tag: String): Unit`
- `SparkContext.removeJobTag(tag: String): Unit`
- `SparkContext.getJobTags(): Set[String]`
- `SparkContext.clearJobTags(): Unit`
- `SparkContext.cancelJobsWithTag(tag: String): Unit`
into Java API.
### Why are the changes needed?
For Java users. In addition, these will be used in Python implementation in SPARK-44194
### Does this PR introduce _any_ user-facing change?
Yes, it adds new API in Java.
### How was this patch tested?
It's an alias so no tests added. These will be at least called once via PySpark when I work on SPARK-44194 so I don't worry about it too much.
Closes #41753 from HyukjinKwon/SPARK-44202.
Authored-by: Hyukjin Kwon <gu...@apache.org>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../main/scala/org/apache/spark/SparkContext.scala | 20 +++++++-
.../apache/spark/api/java/JavaSparkContext.scala | 55 ++++++++++++++++++++++
2 files changed, 73 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index fe3fe1be429..24d788ff5bc 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -844,6 +844,8 @@ class SparkContext(config: SparkConf) extends Logging {
* being called on the job's executor threads. This is useful to help ensure that the tasks
* are actually stopped in a timely manner, but is off by default due to HDFS-1208, where HDFS
* may respond to Thread.interrupt() by marking nodes as dead.
+ *
+ * @since 3.5.0
*/
def setInterruptOnCancel(interruptOnCancel: Boolean): Unit = {
setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, interruptOnCancel.toString)
@@ -853,6 +855,8 @@ class SparkContext(config: SparkConf) extends Logging {
* Add a tag to be assigned to all the jobs started by this thread.
*
* @param tag The tag to be added. Cannot contain ',' (comma) character.
+ *
+ * @since 3.5.0
*/
def addJobTag(tag: String): Unit = {
SparkContext.throwIfInvalidTag(tag)
@@ -866,6 +870,8 @@ class SparkContext(config: SparkConf) extends Logging {
* Noop if such a tag was not added earlier.
*
* @param tag The tag to be removed. Cannot contain ',' (comma) character.
+ *
+ * @since 3.5.0
*/
def removeJobTag(tag: String): Unit = {
SparkContext.throwIfInvalidTag(tag)
@@ -874,14 +880,22 @@ class SparkContext(config: SparkConf) extends Logging {
setLocalProperty(SparkContext.SPARK_JOB_TAGS, newTags)
}
- /** Get the tags that are currently set to be assigned to all the jobs started by this thread. */
+ /**
+ * Get the tags that are currently set to be assigned to all the jobs started by this thread.
+ *
+ * @since 3.5.0
+ */
def getJobTags(): Set[String] = {
Option(getLocalProperty(SparkContext.SPARK_JOB_TAGS))
.map(_.split(SparkContext.SPARK_JOB_TAGS_SEP).toSet)
.getOrElse(Set())
}
- /** Clear the current thread's job tags. */
+ /**
+ * Clear the current thread's job tags.
+ *
+ * @since 3.5.0
+ */
def clearJobTags(): Unit = {
setLocalProperty(SparkContext.SPARK_JOB_TAGS, null)
}
@@ -2532,6 +2546,8 @@ class SparkContext(config: SparkConf) extends Logging {
* Cancel active jobs that have the specified tag. See `org.apache.spark.SparkContext.addJobTag`.
*
* @param tag The tag to be added. Cannot contain ',' (comma) character.
+ *
+ * @since 3.5.0
*/
def cancelJobsWithTag(tag: String): Unit = {
SparkContext.throwIfInvalidTag(tag)
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 608158caacd..31703cdad5b 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -712,12 +712,67 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable {
/** Clear the current thread's job group ID and its description. */
def clearJobGroup(): Unit = sc.clearJobGroup()
+ /**
+ * Set the behavior of job cancellation from jobs started in this thread.
+ *
+ * @param interruptOnCancel If true, then job cancellation will result in `Thread.interrupt()`
+ * being called on the job's executor threads. This is useful to help ensure that the tasks
+ * are actually stopped in a timely manner, but is off by default due to HDFS-1208, where HDFS
+ * may respond to Thread.interrupt() by marking nodes as dead.
+ *
+ * @since 3.5.0
+ */
+ def setInterruptOnCancel(interruptOnCancel: Boolean): Unit =
+ sc.setInterruptOnCancel(interruptOnCancel)
+
+ /**
+ * Add a tag to be assigned to all the jobs started by this thread.
+ *
+ * @param tag The tag to be added. Cannot contain ',' (comma) character.
+ *
+ * @since 3.5.0
+ */
+ def addJobTag(tag: String): Unit = sc.addJobTag(tag)
+
+ /**
+ * Remove a tag previously added to be assigned to all the jobs started by this thread.
+ * Noop if such a tag was not added earlier.
+ *
+ * @param tag The tag to be removed. Cannot contain ',' (comma) character.
+ *
+ * @since 3.5.0
+ */
+ def removeJobTag(tag: String): Unit = sc.removeJobTag(tag)
+
+ /**
+ * Get the tags that are currently set to be assigned to all the jobs started by this thread.
+ *
+ * @since 3.5.0
+ */
+ def getJobTags(): util.Set[String] = sc.getJobTags.asJava
+
+ /**
+ * Clear the current thread's job tags.
+ *
+ * @since 3.5.0
+ */
+ def clearJobTags(): Unit = sc.clearJobTags()
+
/**
* Cancel active jobs for the specified group. See
* `org.apache.spark.api.java.JavaSparkContext.setJobGroup` for more information.
*/
def cancelJobGroup(groupId: String): Unit = sc.cancelJobGroup(groupId)
+ /**
+ * Cancel active jobs that have the specified tag. See `org.apache.spark.SparkContext.addJobTag`.
+ *
+ * @param tag The tag to be added. Cannot contain ',' (comma) character.
+ *
+ * @since 3.5.0
+ */
+ def cancelJobsWithTag(tag: String): Unit = sc.cancelJobsWithTag(tag)
+
/** Cancel all jobs that have been scheduled or are running. */
def cancelAllJobs(): Unit = sc.cancelAllJobs()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org