You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "t oo (Jira)" <ji...@apache.org> on 2020/03/06 15:04:00 UTC

[jira] [Updated] (AIRFLOW-6994) SparkSubmitOperator re launches spark driver even when original driver still running

     [ https://issues.apache.org/jira/browse/AIRFLOW-6994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

t oo updated AIRFLOW-6994:
--------------------------
    Description: 
https://issues.apache.org/jira/browse/AIRFLOW-6229 introduced a bug

Due to temporary network blip in connection to spark the state goes to unknown (as no tags found in curl response) and forces retry

fix in spark_submit_hook.py:

  
{code:java}
  def _process_spark_status_log(self, itr):
        """
        parses the logs of the spark driver status query process

        :param itr: An iterator which iterates over the input of the subprocess
        """
        response_found = False
        driver_found = False
        # Consume the iterator
        for line in itr:
            line = line.strip()

            if "submissionId" in line:
                response_found = True
            
            # Check if the log line is about the driver status and extract the status.
            if "driverState" in line:
                self._driver_status = line.split(' : ')[1] \
                    .replace(',', '').replace('\"', '').strip()
                driver_found = True

            self.log.debug("spark driver status log: {}".format(line))

        if response_found and not driver_found:
            self._driver_status = "UNKNOWN"
{code}


  was:
You click ‘release’ on a new spark cluster while the prior spark cluster is processing some spark submits from airflow. Then airflow is never able to finish the sparksubmit task as it polls from status on the new spark cluster build which it can’t find status for as the submit happened on earlier spark cluster build….the status loop goes on forever

 

[https://github.com/apache/airflow/blob/1.10.6/airflow/contrib/hooks/spark_submit_hook.py#L446]

[https://github.com/apache/airflow/blob/1.10.6/airflow/contrib/hooks/spark_submit_hook.py#L489]

It loops forever if it can’t find driverState tag in the json response, since the new build (pointed to by the released DNS name) doesn’t know about the driver submitted (in previously released build) then the 2nd response below does not contain the driverState tag.

  

#response before clicking release on new build

[ec2-user@reda ~]$

curl +[http://dns:6066/v1/submissions/status/driver-20191202142207-0000]+

{  "action" : "SubmissionStatusResponse",  "driverState" : "RUNNING",  "serverSparkVersion" : "2.3.4",  "submissionId" : "driver-20191202142207-0000",  "success" : true,  "workerHostPort" : "reda:31489",  "workerId" : "worker-20191202133526-reda-31489"}

 

#response after clicking release on new build

[ec2-user@reda ~]$

curl [http://dns:6066/v1/submissions/status/driver-20191202142207-0000]     

{  "action" : "SubmissionStatusResponse",  "serverSparkVersion" : "2.3.4",  "submissionId" : "driver-20191202142207-0000",  "success" : false               }

               

 

Definitely a defect in current code. Can fix this by modifying _process_spark_status_log function to set driver status to UNKNOWN if driverState is not in response after iterating all lines.

 


> SparkSubmitOperator re launches spark driver even when original driver still running
> ------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-6994
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6994
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: scheduler
>    Affects Versions: 1.10.6
>            Reporter: t oo
>            Assignee: t oo
>            Priority: Major
>             Fix For: 1.10.8
>
>
> https://issues.apache.org/jira/browse/AIRFLOW-6229 introduced a bug
> Due to temporary network blip in connection to spark the state goes to unknown (as no tags found in curl response) and forces retry
> fix in spark_submit_hook.py:
>   
> {code:java}
>   def _process_spark_status_log(self, itr):
>         """
>         parses the logs of the spark driver status query process
>         :param itr: An iterator which iterates over the input of the subprocess
>         """
>         response_found = False
>         driver_found = False
>         # Consume the iterator
>         for line in itr:
>             line = line.strip()
>             if "submissionId" in line:
>                 response_found = True
>             
>             # Check if the log line is about the driver status and extract the status.
>             if "driverState" in line:
>                 self._driver_status = line.split(' : ')[1] \
>                     .replace(',', '').replace('\"', '').strip()
>                 driver_found = True
>             self.log.debug("spark driver status log: {}".format(line))
>         if response_found and not driver_found:
>             self._driver_status = "UNKNOWN"
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)