You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/07/18 22:42:42 UTC
[spark] branch master updated: [SPARK-36014][K8S] Use uuid as app
id in kubernetes client mode
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fe94bf0 [SPARK-36014][K8S] Use uuid as app id in kubernetes client mode
fe94bf0 is described below
commit fe94bf07f9acec302e7d8becd7e576c777337331
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Sun Jul 18 15:41:47 2021 -0700
[SPARK-36014][K8S] Use uuid as app id in kubernetes client mode
### What changes were proposed in this pull request?
Use uuid instead of `System. currentTimeMillis` as app id in kubernetes client mode.
### Why are the changes needed?
Currently, spark on kubernetes with client mode would use `"spark-application-" + System.currentTimeMillis` as app id by default. It would cause app id conflict if submit several spark applications to kubernetes cluster in a short time.
Unfortunately, the event log use app id as the file name. With the conflict event log file, the exception was thrown.
```
Caused by: java.io.FileNotFoundException: File does not exist: xxx/spark-application-1624766876324.lz4.inprogress (inode 5984170846) Holder does not have any open files.
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2697)
at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.analyzeFileState(FSDirWriteFileOp.java:521)
at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.validateAddBlock(FSDirWriteFileOp.java:161)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2579)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:846)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:510)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:503)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:871)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:817)
```
### Does this PR introduce _any_ user-facing change?
yes
### How was this patch tested?
manual test
![image](https://user-images.githubusercontent.com/12025282/124435341-7a88e180-dda7-11eb-8e62-bdfec6a0ee3b.png)
Closes #33211 from ulysses-you/k8s-appid.
Authored-by: ulysses-you <ul...@gmail.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala | 5 ++++-
.../spark/deploy/k8s/submit/KubernetesClientApplication.scala | 4 +---
.../scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala | 7 ++++---
3 files changed, 9 insertions(+), 7 deletions(-)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index 937c5f5..de084da 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.deploy.k8s
-import java.util.Locale
+import java.util.{Locale, UUID}
import io.fabric8.kubernetes.api.model.{LocalObjectReference, LocalObjectReferenceBuilder, Pod}
@@ -225,6 +225,9 @@ private[spark] object KubernetesConf {
new KubernetesExecutorConf(sparkConf.clone(), appId, executorId, driverPod, resourceProfileId)
}
+ def getKubernetesAppId(): String =
+ s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
+
def getResourceNamePrefix(appName: String): String = {
val id = KubernetesUtils.uniqueID()
s"$appName-$id"
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index 3140502..e3b80b1 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -16,8 +16,6 @@
*/
package org.apache.spark.deploy.k8s.submit
-import java.util.UUID
-
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.control.Breaks._
@@ -191,7 +189,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
// to be added as a label to group resources belonging to the same application. Label values are
// considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate
// a unique app ID (captured by spark.app.id) in the format below.
- val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
+ val kubernetesAppId = KubernetesConf.getKubernetesAppId()
val kubernetesConf = KubernetesConf.createDriverConf(
sparkConf,
kubernetesAppId,
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index 5dad6a3..42a9300 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -24,9 +24,9 @@ import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.spark.SparkContext
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.KubernetesUtils
import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.config.SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO
@@ -48,6 +48,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
watchEvents: ExecutorPodsWatchSnapshotSource,
pollEvents: ExecutorPodsPollingSnapshotSource)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
+ private val appId = KubernetesConf.getKubernetesAppId()
protected override val minRegisteredRatio =
if (conf.get(SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO).isEmpty) {
@@ -83,12 +84,12 @@ private[spark] class KubernetesClusterSchedulerBackend(
/**
* Get an application ID associated with the job.
* This returns the string value of spark.app.id if set, otherwise
- * the locally-generated ID from the superclass.
+ * the locally-generated ID.
*
* @return The application ID
*/
override def applicationId(): String = {
- conf.getOption("spark.app.id").map(_.toString).getOrElse(super.applicationId)
+ conf.getOption("spark.app.id").map(_.toString).getOrElse(appId)
}
override def start(): Unit = {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org