You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yury (Jira)" <ji...@apache.org> on 2022/05/06 14:27:00 UTC

[jira] [Updated] (SPARK-39115) Can't submit second Spark job K8S using InProcesslauncher

     [ https://issues.apache.org/jira/browse/SPARK-39115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yury updated SPARK-39115:
-------------------------
    Description: 
We tried to use Spark InProcessLauncher to submit Spark job on Kubernetes. The purpose was to be able to launch several jobs within the same Java process. The first job run successfully, but the second one fails with:

{code:bash}
2022-05-06 10:00:16.867 [spark-app-2: '...parkEngineDriver'] WARN  o.a.s.launcher.InProcessAppHandle - Application failed with exception.
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT at: https://0.0.0.0:37811/api/v1/namespaces/connectivity/configmaps/spark-drv-22fc5d8099ab53dc-conf-map. Message: ConfigMap "spark-drv-22fc5d8099ab53dc-conf-map" is invalid: data: Forbidden: field is immutable when `immutable` is set. Received status: Status(apiVersion=v1, code=422, details=StatusDetails(causes=[StatusCause(field=data, message=Forbidden: field is immutable when `immutable` is set, reason=FieldValueForbidden, additionalProperties={})], group=null, kind=ConfigMap, name=spark-drv-22fc5d8099ab53dc-conf-map, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=ConfigMap "spark-drv-22fc5d8099ab53dc-conf-map" is invalid: data: Forbidden: field is immutable when `immutable` is set, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, status=Failure, additionalProperties={}).
    at io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:639)
    at io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:578)
    at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:543)
    at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:504)
    at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleUpdate(OperationSupport.java:330)
    at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleUpdate(OperationSupport.java:310)
    at io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleUpdate(BaseOperation.java:898)
    at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.lambda$replace$0(HasMetadataOperation.java:132)
    at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:137)
    at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:97)
    at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:38)
    at io.fabric8.kubernetes.client.handlers.core.v1.ConfigMapHandler.replace(ConfigMapHandler.java:44)
    at io.fabric8.kubernetes.client.handlers.core.v1.ConfigMapHandler.replace(ConfigMapHandler.java:25)
    at io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.lambda$createOrReplaceItem$1(CreateOrReplaceHelper.java:78)
    at io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.replace(CreateOrReplaceHelper.java:96)
    at io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.createOrReplace(CreateOrReplaceHelper.java:69)
    at io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.createOrReplaceItem(CreateOrReplaceHelper.java:91)
    at io.fabric8.kubernetes.client.dsl.internal.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.createOrReplaceOrDeleteExisting(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java:454)
    at io.fabric8.kubernetes.client.dsl.internal.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.createOrReplace(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java:297)
    at io.fabric8.kubernetes.client.dsl.internal.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.createOrReplace(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java:66)
    at org.apache.spark.deploy.k8s.submit.Client.run(KubernetesClientApplication.scala:150)
    at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$4(KubernetesClientApplication.scala:220)
    at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$4$adapted(KubernetesClientApplication.scala:214)
    at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2713)
    at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.run(KubernetesClientApplication.scala:214)
    at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.start(KubernetesClientApplication.scala:186)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.InProcessSparkSubmit$.main(SparkSubmit.scala:984)
    at org.apache.spark.deploy.InProcessSparkSubmit.main(SparkSubmit.scala)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.apache.spark.launcher.InProcessAppHandle.lambda$start$0(InProcessAppHandle.java:72)
    at java.base/java.lang.Thread.run(Thread.java:833) 
{code}

Attaching code example:

{code:scala}
 val launcher = new InProcessLauncher()
 .setMaster("k8s://https://0.0.0.0:37811")
 .setDeployMode("cluster")
 .setAppResource("{location_to_pi_example}")
 .setMainClass("org.apache.spark.examples.SparkPi")
 .setConf("spark.kubernetes.container.image", "{spark_pi_example_image}")
 .setConf("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
 .setConf("spark.kubernetes.namespace", "default")
 .setConf("spark.kubernetes.executor.disableConfigMap", "true")
 .setConf("spark.kubernetes.container.image.pullPolicy", "Always")

 launcher.setPropertiesFile(getClass.getResource("/spark-defaults.conf").getPath)
{code}

After researching I found these lines in org.apache.spark.deploy.k8s.submit.KubernetesClientUtils.scala file:

{code:scala}
  val configMapNameExecutor: String = configMapName(s"spark-exec-${KubernetesUtils.uniqueID()}")

  val configMapNameDriver: String = configMapName(s"spark-drv-${KubernetesUtils.uniqueID()}")
{code}

Switching from `val` to `def` solves the issue.


  was:
We tried to use Spark InProcessLauncher to submit Spark job on Kubernetes. The purpose was to be able to launch several jobs within the same Java process. The first job run successfully, but the second one fails with:
{code:java}
2022-05-06 10:00:16.867 [spark-app-2: '...parkEngineDriver'] WARN  o.a.s.launcher.InProcessAppHandle - Application failed with exception.
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT at: https://0.0.0.0:37811/api/v1/namespaces/connectivity/configmaps/spark-drv-22fc5d8099ab53dc-conf-map. Message: ConfigMap "spark-drv-22fc5d8099ab53dc-conf-map" is invalid: data: Forbidden: field is immutable when `immutable` is set. Received status: Status(apiVersion=v1, code=422, details=StatusDetails(causes=[StatusCause(field=data, message=Forbidden: field is immutable when `immutable` is set, reason=FieldValueForbidden, additionalProperties={})], group=null, kind=ConfigMap, name=spark-drv-22fc5d8099ab53dc-conf-map, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=ConfigMap "spark-drv-22fc5d8099ab53dc-conf-map" is invalid: data: Forbidden: field is immutable when `immutable` is set, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, status=Failure, additionalProperties={}).
    at io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:639)
    at io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:578)
    at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:543)
    at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:504)
    at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleUpdate(OperationSupport.java:330)
    at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleUpdate(OperationSupport.java:310)
    at io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleUpdate(BaseOperation.java:898)
    at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.lambda$replace$0(HasMetadataOperation.java:132)
    at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:137)
    at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:97)
    at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:38)
    at io.fabric8.kubernetes.client.handlers.core.v1.ConfigMapHandler.replace(ConfigMapHandler.java:44)
    at io.fabric8.kubernetes.client.handlers.core.v1.ConfigMapHandler.replace(ConfigMapHandler.java:25)
    at io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.lambda$createOrReplaceItem$1(CreateOrReplaceHelper.java:78)
    at io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.replace(CreateOrReplaceHelper.java:96)
    at io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.createOrReplace(CreateOrReplaceHelper.java:69)
    at io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.createOrReplaceItem(CreateOrReplaceHelper.java:91)
    at io.fabric8.kubernetes.client.dsl.internal.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.createOrReplaceOrDeleteExisting(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java:454)
    at io.fabric8.kubernetes.client.dsl.internal.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.createOrReplace(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java:297)
    at io.fabric8.kubernetes.client.dsl.internal.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.createOrReplace(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java:66)
    at org.apache.spark.deploy.k8s.submit.Client.run(KubernetesClientApplication.scala:150)
    at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$4(KubernetesClientApplication.scala:220)
    at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$4$adapted(KubernetesClientApplication.scala:214)
    at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2713)
    at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.run(KubernetesClientApplication.scala:214)
    at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.start(KubernetesClientApplication.scala:186)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.InProcessSparkSubmit$.main(SparkSubmit.scala:984)
    at org.apache.spark.deploy.InProcessSparkSubmit.main(SparkSubmit.scala)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.apache.spark.launcher.InProcessAppHandle.lambda$start$0(InProcessAppHandle.java:72)
    at java.base/java.lang.Thread.run(Thread.java:833) 

{code}


> Can't submit second Spark job K8S using InProcesslauncher
> ---------------------------------------------------------
>
>                 Key: SPARK-39115
>                 URL: https://issues.apache.org/jira/browse/SPARK-39115
>             Project: Spark
>          Issue Type: Bug
>          Components: Kubernetes
>    Affects Versions: 3.2.0, 3.2.1
>            Reporter: Yury
>            Priority: Major
>
> We tried to use Spark InProcessLauncher to submit Spark job on Kubernetes. The purpose was to be able to launch several jobs within the same Java process. The first job run successfully, but the second one fails with:
> {code:bash}
> 2022-05-06 10:00:16.867 [spark-app-2: '...parkEngineDriver'] WARN  o.a.s.launcher.InProcessAppHandle - Application failed with exception.
> io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT at: https://0.0.0.0:37811/api/v1/namespaces/connectivity/configmaps/spark-drv-22fc5d8099ab53dc-conf-map. Message: ConfigMap "spark-drv-22fc5d8099ab53dc-conf-map" is invalid: data: Forbidden: field is immutable when `immutable` is set. Received status: Status(apiVersion=v1, code=422, details=StatusDetails(causes=[StatusCause(field=data, message=Forbidden: field is immutable when `immutable` is set, reason=FieldValueForbidden, additionalProperties={})], group=null, kind=ConfigMap, name=spark-drv-22fc5d8099ab53dc-conf-map, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=ConfigMap "spark-drv-22fc5d8099ab53dc-conf-map" is invalid: data: Forbidden: field is immutable when `immutable` is set, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, status=Failure, additionalProperties={}).
>     at io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:639)
>     at io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:578)
>     at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:543)
>     at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:504)
>     at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleUpdate(OperationSupport.java:330)
>     at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleUpdate(OperationSupport.java:310)
>     at io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleUpdate(BaseOperation.java:898)
>     at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.lambda$replace$0(HasMetadataOperation.java:132)
>     at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:137)
>     at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:97)
>     at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:38)
>     at io.fabric8.kubernetes.client.handlers.core.v1.ConfigMapHandler.replace(ConfigMapHandler.java:44)
>     at io.fabric8.kubernetes.client.handlers.core.v1.ConfigMapHandler.replace(ConfigMapHandler.java:25)
>     at io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.lambda$createOrReplaceItem$1(CreateOrReplaceHelper.java:78)
>     at io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.replace(CreateOrReplaceHelper.java:96)
>     at io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.createOrReplace(CreateOrReplaceHelper.java:69)
>     at io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.createOrReplaceItem(CreateOrReplaceHelper.java:91)
>     at io.fabric8.kubernetes.client.dsl.internal.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.createOrReplaceOrDeleteExisting(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java:454)
>     at io.fabric8.kubernetes.client.dsl.internal.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.createOrReplace(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java:297)
>     at io.fabric8.kubernetes.client.dsl.internal.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.createOrReplace(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java:66)
>     at org.apache.spark.deploy.k8s.submit.Client.run(KubernetesClientApplication.scala:150)
>     at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$4(KubernetesClientApplication.scala:220)
>     at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$4$adapted(KubernetesClientApplication.scala:214)
>     at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2713)
>     at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.run(KubernetesClientApplication.scala:214)
>     at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.start(KubernetesClientApplication.scala:186)
>     at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
>     at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
>     at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
>     at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
>     at org.apache.spark.deploy.InProcessSparkSubmit$.main(SparkSubmit.scala:984)
>     at org.apache.spark.deploy.InProcessSparkSubmit.main(SparkSubmit.scala)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>     at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>     at org.apache.spark.launcher.InProcessAppHandle.lambda$start$0(InProcessAppHandle.java:72)
>     at java.base/java.lang.Thread.run(Thread.java:833) 
> {code}
> Attaching code example:
> {code:scala}
>  val launcher = new InProcessLauncher()
>  .setMaster("k8s://https://0.0.0.0:37811")
>  .setDeployMode("cluster")
>  .setAppResource("{location_to_pi_example}")
>  .setMainClass("org.apache.spark.examples.SparkPi")
>  .setConf("spark.kubernetes.container.image", "{spark_pi_example_image}")
>  .setConf("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
>  .setConf("spark.kubernetes.namespace", "default")
>  .setConf("spark.kubernetes.executor.disableConfigMap", "true")
>  .setConf("spark.kubernetes.container.image.pullPolicy", "Always")
>  launcher.setPropertiesFile(getClass.getResource("/spark-defaults.conf").getPath)
> {code}
> After researching I found these lines in org.apache.spark.deploy.k8s.submit.KubernetesClientUtils.scala file:
> {code:scala}
>   val configMapNameExecutor: String = configMapName(s"spark-exec-${KubernetesUtils.uniqueID()}")
>   val configMapNameDriver: String = configMapName(s"spark-drv-${KubernetesUtils.uniqueID()}")
> {code}
> Switching from `val` to `def` solves the issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org