You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Rosie Bloxsom (Jira)" <ji...@apache.org> on 2020/09/03 15:22:00 UTC

[jira] [Created] (SPARK-32790) FINISHED state of application is not final

Rosie Bloxsom created SPARK-32790:
-------------------------------------

             Summary: FINISHED state of application is not final
                 Key: SPARK-32790
                 URL: https://issues.apache.org/jira/browse/SPARK-32790
             Project: Spark
          Issue Type: Bug
          Components: Spark Submit
    Affects Versions: 2.4.4
         Environment: Spark 2.4.4 (and probably every version since, from looking at the code?)

On a local machine.
            Reporter: Rosie Bloxsom


If you launch an application with SparkLauncher.startApplication, and pass a listener to listener to the returned state, there are supposed to be two possible "final" states:
 * FINISHED, denoting success
 * FAILED, denoting a failure

Because they are final, if you receive a FINISHED signal you should be able to proceed as if there was no error.

Unfortunately, this code:
https://github.com/apache/spark/blob/233c214a752771f5d8ca9fb2aea93cf1776a552d/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java#L128
which I think is related to decisions from this previous issue: https://github.com/apache/spark/pull/18877
means that in case of an error, a FINISHED event is sent, followed shortly thereafter by a FAILED event, and both of these events are "final".

I'm not sure if there's a way to fix it so that only one event is sent - ideally when the child process fails, we would only send FAILED, rather than sending "FINISHED" first? If we can't change it, then at least we should update the docs to explain what happens, and maybe change the definition of isFinal?

To reproduce, install spark 2.4.4 and run this scala code using one of the spark example jars. It shows the transition between the states for a trivially erroring spark application. The states received are:

{noformat}
Received event updating state to CONNECTED
Received event updating state to RUNNING
Received event updating state to FINISHED
Received event updating state to FAILED
{noformat}


{code:scala}
package foo

import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher}
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.should.Matchers
import scala.concurrent.duration._
import scala.concurrent.{Await, Promise}

class FinishedStateNotFinalSpec extends AnyFlatSpecLike with Matchers {
  "it" should "enter FAILED state without entering into FINISHED state" in {

    val examplesJar = "file:/C:/spark/spark-2.4.4-bin-hadoop2.7/spark-2.4.4-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.4.jar"

    val launcher = new SparkLauncher()
      .setSparkHome("""C:\spark\spark-2.4.4-bin-hadoop2.7\spark-2.4.4-bin-hadoop2.7""")
      .setAppResource(examplesJar)
      .redirectError()
      .redirectOutput(java.io.File.createTempFile("spark-error", "log"))
      .setAppName("Test")
      .setMaster("local[1]")
      .setMainClass("org.apache.spark.examples.SparkPi")
      .addAppArgs("This causes an error, because it should be a number not a string")

    val sparkCompletionPromise = Promise[Unit]()

    launcher.startApplication(new SparkAppListener(sparkCompletionPromise))

    Await.result(sparkCompletionPromise.future, 100000 millis)

    // check in the console output to see which states were entered
  }
}

class SparkAppListener(promise: Promise[Unit]) extends SparkAppHandle.Listener {

  def stateChanged(handle: SparkAppHandle): Unit = {
    val appState = handle.getState
    println(s"Received event updating state to $appState")
    if (appState.isFinal && appState == SparkAppHandle.State.FINISHED) {
      // Without this sleep, my program continues as if the spark-submit was a success.
      // With this sleep, there is a chance for the correct "FAILED" state to be registered.
      // But we shouldn't need this sleep, we should receive the FAILED state as the only "final" state.
      Thread.sleep(1000)
      promise.success(Unit)
    }
    else if (appState.isFinal && appState == SparkAppHandle.State.FAILED) {
      promise.failure(new RuntimeException("Spark run failed"))
    }
  }

  override def infoChanged(handle: SparkAppHandle): Unit = {}
}


{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org